diff --git a/agent-langchain-ts/package.json b/agent-langchain-ts/package.json index f1dbda30..0dbd0355 100644 --- a/agent-langchain-ts/package.json +++ b/agent-langchain-ts/package.json @@ -1,7 +1,7 @@ { "name": "@databricks/agent-langchain-ts", "version": "1.0.0", - "description": "TypeScript LangChain agent with MLflow tracing on Databricks", + "description": "TypeScript LangChain agent on Databricks, powered by @databricks/appkit", "type": "module", "engines": { "node": ">=22.0.0" @@ -18,9 +18,9 @@ "build:ui": "cd ui && npm install && npm run build", "test": "jest --testPathIgnorePatterns=examples", "test:unit": "jest tests/agent.test.ts", - "test:integration": "jest --runInBand tests/framework/integration.test.ts tests/framework/endpoints.test.ts tests/framework/error-handling.test.ts tests/framework/followup-questions.test.ts", "test:e2e": "jest --config jest.e2e.config.js", - "test:all": "npm run test:unit && npm run test:integration", + "test:all": "npm run test:unit", + "link:appkit": "npm install --save ../../appkit/packages/appkit/tmp/databricks-appkit-*.tgz", "quickstart": "tsx scripts/quickstart.ts", "discover-tools": "tsx scripts/discover-tools.ts", "lint": "eslint src --ext .ts", @@ -28,36 +28,20 @@ }, "dependencies": { "@arizeai/openinference-instrumentation-langchain": "^4.0.0", - "@databricks/ai-sdk-provider": "^0.3.0", + "@databricks/appkit": "^0.16.0", "@databricks/langchainjs": "^0.1.0", - "@databricks/sdk-experimental": "0.15.0", "@langchain/core": "^1.1.8", "@langchain/langgraph": "^1.1.2", "@langchain/mcp-adapters": "^1.1.1", - "@opentelemetry/api": "^1.9.0", - "@opentelemetry/exporter-trace-otlp-proto": "^0.55.0", - "@opentelemetry/sdk-trace-node": "^1.28.0", - "ai": "^6.0.0", - "cors": "^2.8.5", "dotenv": "^16.4.5", - "express": "^5.1.0", - "express-rate-limit": "^8.2.1", - "langchain": "^0.3.20", "mathjs": "^15.1.0", - "openai": "^5.12.2", "zod": "^4.3.5" }, "devDependencies": { - "@types/cors": "^2.8.17", - "@types/express": "^5.0.0", "@types/jest": "^29.5.14", "@types/node": "^22.0.0", - "@typescript-eslint/eslint-plugin": "^8.0.0", - "@typescript-eslint/parser": "^8.0.0", "concurrently": "^9.2.1", - "eslint": "^9.0.0", "jest": "^29.7.0", - "prettier": "^3.4.0", "ts-jest": "^29.2.5", "tsx": "^4.19.0", "typescript": "^5.7.0" @@ -65,9 +49,7 @@ "keywords": [ "databricks", "langchain", - "mlflow", - "opentelemetry", - "tracing", + "appkit", "agent", "typescript" ], diff --git a/agent-langchain-ts/src/agent.ts b/agent-langchain-ts/src/agent.ts deleted file mode 100644 index 20a2c609..00000000 --- a/agent-langchain-ts/src/agent.ts +++ /dev/null @@ -1,345 +0,0 @@ -/** - * LangChain agent implementation using standard LangGraph APIs. - * - * Uses createReactAgent from @langchain/langgraph/prebuilt for: - * - Automatic tool calling and execution - * - Built-in agentic loop - * - Streaming support - * - Standard LangChain message format - */ - -import { ChatDatabricks, DatabricksMCPServer } from "@databricks/langchainjs"; -import { BaseMessage, HumanMessage, SystemMessage } from "@langchain/core/messages"; -import { createReactAgent } from "@langchain/langgraph/prebuilt"; -import { randomUUID } from "crypto"; -import type { - ResponseFunctionToolCall, - ResponseOutputItem, - ResponseOutputItemAddedEvent, - ResponseOutputItemDoneEvent, - ResponseOutputMessage, - ResponseStreamEvent, - ResponseTextDeltaEvent, -} from "openai/resources/responses/responses.js"; -import type { AgentInterface, InvokeParams } from "./framework/agent-interface.js"; -import { getAllTools } from "./tools.js"; - -/** - * Agent configuration - */ -export interface AgentConfig { - /** - * Databricks model serving endpoint name or model ID - * Examples: "databricks-claude-sonnet-4-5", "databricks-gpt-5-2" - */ - model?: string; - - /** - * Whether ChatDatabricks calls the upstream model using the Responses API - * instead of the Chat Completions API. When true, the model response may - * include richer fields (citations, reasoning tokens) from supported endpoints. - * Note: the framework always produces Responses API SSE format regardless of - * this flag — it only affects the upstream model call. - * Default: false - */ - useResponsesApi?: boolean; - - /** - * Temperature for response generation (0.0 - 1.0) - */ - temperature?: number; - - /** - * Maximum tokens to generate - */ - maxTokens?: number; - - /** - * System prompt for the agent - */ - systemPrompt?: string; - - /** - * MCP servers for additional tools - */ - mcpServers?: DatabricksMCPServer[]; - -} - -/** - * Default system prompt for the agent - */ -const DEFAULT_SYSTEM_PROMPT = `You are a helpful AI assistant with access to various tools`; - -/** - * Convert plain message objects to LangChain BaseMessage objects - */ -function convertToBaseMessages(messages: any[]): BaseMessage[] { - return messages.map((msg) => { - if (msg instanceof BaseMessage) { - return msg; - } - - const content = msg.content || ""; - switch (msg.role) { - case "user": - return new HumanMessage(content); - case "assistant": - return { role: "assistant", content } as any; - case "system": - return new SystemMessage(content); - default: - return new HumanMessage(content); - } - }); -} - -/** - * Standard LangGraph agent wrapper - * - * Wraps createReactAgent and implements AgentInterface so the framework - * can call invoke() and stream() without knowing LangGraph internals. - * If you switch to a different SDK, re-implement these two methods. - */ -export class StandardAgent implements AgentInterface { - private agent: Awaited>; - private systemPrompt: string; - - constructor(agent: Awaited>, systemPrompt: string) { - this.agent = agent; - this.systemPrompt = systemPrompt; - } - - /** - * Handle a non-streaming request. - * Returns a ResponseOutputMessage wrapping the final assistant reply. - */ - async invoke(params: InvokeParams): Promise { - const { input, chat_history = [] } = params; - - const messages: BaseMessage[] = [ - new SystemMessage(this.systemPrompt), - ...convertToBaseMessages(chat_history), - new HumanMessage(input), - ]; - - const result = await this.agent.invoke({ messages }); - - const finalMessages = result.messages || []; - const lastMessage = finalMessages[finalMessages.length - 1]; - const text = - typeof lastMessage?.content === "string" ? lastMessage.content : ""; - - const outputMessage: ResponseOutputMessage = { - id: `msg_${randomUUID()}`, - type: "message", - role: "assistant", - status: "completed", - content: [{ type: "output_text", text, annotations: [] }], - }; - - return [outputMessage]; - } - - /** - * Handle a streaming request. Yields Responses API SSE events. - * - * Maps LangGraph stream events → ResponseStreamEvent: - * on_tool_start → response.output_item.added / done (function_call) - * on_tool_end → response.output_item.added / done (function_call_output) - * on_chat_model_stream → response.output_text.delta - * - * If you switch SDKs, re-implement this method to yield ResponseStreamEvent - * from your SDK's streaming output. - */ - async *stream(params: InvokeParams): AsyncGenerator { - const { input, chat_history = [] } = params; - - const messages: BaseMessage[] = [ - new SystemMessage(this.systemPrompt), - ...convertToBaseMessages(chat_history), - new HumanMessage(input), - ]; - - const toolCallIds = new Map(); - let seqNum = 0; - let outputIndex = 0; - const textItemId = `msg_${randomUUID()}`; - let textOutputIndex = -1; // set on first text delta - - const eventStream = this.agent.streamEvents({ messages }, { version: "v2" }); - - for await (const event of eventStream) { - // Tool call started — emit function_call output item - if (event.event === "on_tool_start") { - const callId = `call_${randomUUID()}`; - toolCallIds.set(`${event.name}_${event.run_id}`, callId); - - const fcItem: ResponseFunctionToolCall = { - id: `fc_${randomUUID()}`, - call_id: callId, - name: event.name, - arguments: JSON.stringify(event.data?.input || {}), - type: "function_call", - status: "completed", - }; - - const currentIndex = outputIndex++; - - const added: ResponseOutputItemAddedEvent = { - type: "response.output_item.added", - item: fcItem, - output_index: currentIndex, - sequence_number: seqNum++, - }; - yield added; - - const done: ResponseOutputItemDoneEvent = { - type: "response.output_item.done", - item: fcItem, - output_index: currentIndex, - sequence_number: seqNum++, - }; - yield done; - } - - // Tool result received — emit function_call_output item - if (event.event === "on_tool_end") { - const toolKey = `${event.name}_${event.run_id}`; - const callId = toolCallIds.get(toolKey) || `call_${randomUUID()}`; - toolCallIds.delete(toolKey); - - const outputItem = { - id: `fco_${randomUUID()}`, - call_id: callId, - output: JSON.stringify(event.data?.output || ""), - type: "function_call_output" as const, - }; - - const currentIndex = outputIndex++; - - yield { - type: "response.output_item.added", - item: outputItem, - output_index: currentIndex, - sequence_number: seqNum++, - } as unknown as ResponseStreamEvent; - - yield { - type: "response.output_item.done", - item: outputItem, - output_index: currentIndex, - sequence_number: seqNum++, - } as unknown as ResponseStreamEvent; - } - - // Text chunk from LLM - if (event.event === "on_chat_model_stream") { - const content = event.data?.chunk?.content; - if (content && typeof content === "string") { - // Emit output_item.added for the text message on first delta - if (textOutputIndex === -1) { - textOutputIndex = outputIndex++; - - const msgItem: ResponseOutputMessage = { - id: textItemId, - type: "message", - role: "assistant", - status: "in_progress", - content: [], - }; - const added: ResponseOutputItemAddedEvent = { - type: "response.output_item.added", - item: msgItem, - output_index: textOutputIndex, - sequence_number: seqNum++, - }; - yield added; - } - - const delta: ResponseTextDeltaEvent = { - type: "response.output_text.delta", - item_id: textItemId, - output_index: textOutputIndex, - content_index: 0, - delta: content, - logprobs: [], - sequence_number: seqNum++, - }; - yield delta; - } - } - } - - // Close the text output item if we streamed any text - if (textOutputIndex !== -1) { - const msgItem: ResponseOutputMessage = { - id: textItemId, - type: "message", - role: "assistant", - status: "completed", - content: [], - }; - const done: ResponseOutputItemDoneEvent = { - type: "response.output_item.done", - item: msgItem, - output_index: textOutputIndex, - sequence_number: seqNum++, - }; - yield done; - } - - // Signal end of response. - yield { - type: "response.completed", - sequence_number: seqNum++, - response: {} as any, - } as unknown as ResponseStreamEvent; - } -} - -/** - * Create a tool-calling agent with ChatDatabricks - * - * Uses standard LangGraph createReactAgent API: - * - Automatic tool calling and execution - * - Built-in agentic loop with reasoning - * - Streaming support out of the box - * - Compatible with MCP tools - * - * @param config Agent configuration - * @returns AgentInterface instance - */ -export async function createAgent(config: AgentConfig = {}): Promise { - const { - model: modelName = "databricks-claude-sonnet-4-5", - useResponsesApi = false, - temperature = 0.1, - maxTokens = 2000, - systemPrompt = DEFAULT_SYSTEM_PROMPT, - mcpServers, - } = config; - - // Create chat model - const model = new ChatDatabricks({ - model: modelName, - useResponsesApi, - temperature, - maxTokens, - }); - - // Load tools (basic + MCP if configured) - const tools = await getAllTools(mcpServers); - - console.log(`✅ Agent initialized with ${tools.length} tool(s)`); - console.log(` Tools: ${tools.map((t) => t.name).join(", ")}`); - - // Create agent using standard LangGraph API - const agent = createReactAgent({ - llm: model, - tools, - }); - - return new StandardAgent(agent, systemPrompt); -} - diff --git a/agent-langchain-ts/src/framework/agent-interface.ts b/agent-langchain-ts/src/framework/agent-interface.ts deleted file mode 100644 index eb30d911..00000000 --- a/agent-langchain-ts/src/framework/agent-interface.ts +++ /dev/null @@ -1,38 +0,0 @@ -import type { - ResponseOutputItem, - ResponseStreamEvent, -} from "openai/resources/responses/responses.js"; - -export interface InvokeParams { - input: string; - chat_history?: Array<{ role: string; content: string }>; -} - -/** - * Interface your agent must implement. - * - * The framework calls invoke() and stream() with a simplified request object. - * Return Responses API-native output types so the framework can serialize them - * correctly over HTTP. - * - * You are responsible for translating your SDK's output (LangChain, OpenAI SDK, - * etc.) into these types. If you switch SDKs, update the implementations here. - */ -export interface AgentInterface { - /** - * Handle a non-streaming request. Return the output items to include in the response. - */ - invoke(params: InvokeParams): Promise; - - /** - * Handle a streaming request. Yield Responses API SSE events. - * - * Must yield events in this order: - * response.output_item.added → response.output_text.delta (×N) → response.output_item.done - * followed by response.completed at the end. - * - * If you switch SDKs, re-implement this method to yield ResponseStreamEvent - * from your SDK's streaming output. - */ - stream(params: InvokeParams): AsyncGenerator; -} diff --git a/agent-langchain-ts/src/framework/routes/invocations.ts b/agent-langchain-ts/src/framework/routes/invocations.ts deleted file mode 100644 index e71e278f..00000000 --- a/agent-langchain-ts/src/framework/routes/invocations.ts +++ /dev/null @@ -1,178 +0,0 @@ -/** - * MLflow-compatible /invocations endpoint (also aliased as /responses). - * - * This endpoint provides a standard Responses API interface that: - * - Accepts Responses API request format (including the OpenAI SDK wire format) - * - Parses the request into simplified InvokeParams for the agent - * - Streams/returns events produced by AgentInterface.stream() / AgentInterface.invoke() - * - * The agent owns the translation from its SDK's format → ResponseStreamEvent. - * This file is purely a pass-through layer. - */ - -import { Router, type Request, type Response } from "express"; -import type { AgentInterface } from "../agent-interface.js"; -import { z } from "zod"; - -/** - * Responses API request schema. - * - * Accepts both the MLflow format (input as array of messages) and the - * OpenAI SDK wire format (input as string or array, optional model field). - */ -const responsesRequestSchema = z.object({ - input: z.union([ - z.string(), - z.array( - z.union([ - z.object({ - role: z.enum(["user", "assistant", "system"]), - content: z.union([ - z.string(), - z.array( - z.union([ - z.object({ type: z.string(), text: z.string() }).passthrough(), - z.object({ type: z.string() }).passthrough(), - ]) - ), - ]), - }), - z.object({ type: z.string() }).passthrough(), - ]) - ), - ]), - stream: z.boolean().optional().default(true), - // Accept (and ignore) model field sent by the OpenAI SDK - model: z.string().optional(), -}); - -/** - * Create invocations router with the given agent. - * Mount at both /invocations (MLflow) and /responses (OpenAI SDK compatibility). - */ -export function createInvocationsRouter(agent: AgentInterface): ReturnType { - const router = Router(); - - router.post("/", async (req: Request, res: Response) => { - try { - // Parse and validate request - const parsed = responsesRequestSchema.safeParse(req.body); - if (!parsed.success) { - return res.status(400).json({ - error: "Invalid request format", - details: parsed.error.format(), - }); - } - - const { stream } = parsed.data; - - // Normalise input: string → single user message - const input = - typeof parsed.data.input === "string" - ? [{ role: "user" as const, content: parsed.data.input }] - : parsed.data.input; - - // Extract the latest user message - const userMessages = input.filter((msg: any) => msg.role === "user"); - if (userMessages.length === 0) { - return res.status(400).json({ - error: "No user message found in input", - }); - } - - const lastUserMessage = userMessages[userMessages.length - 1]; - - // Handle both string and array content formats - let userInput: string; - if (Array.isArray(lastUserMessage.content)) { - userInput = lastUserMessage.content - .filter((part: any) => part.type === "input_text" || part.type === "text") - .map((part: any) => part.text) - .join("\n"); - } else { - userInput = lastUserMessage.content as string; - } - - // Convert Responses API input to simple chat history for the agent. - // Preserve tool call context for followup questions. - const chatHistory = input.slice(0, -1).map((item: any) => { - if (item.type === "function_call") { - return { - role: "assistant", - content: `[Tool Call: ${item.name}(${item.arguments})]`, - }; - } else if (item.type === "function_call_output") { - return { - role: "assistant", - content: `[Tool Result: ${item.output}]`, - }; - } - - if (Array.isArray(item.content)) { - const textParts = item.content - .filter((part: any) => - part.type === "input_text" || - part.type === "output_text" || - part.type === "text" - ) - .map((part: any) => part.text); - - const toolParts = item.content - .filter((part: any) => - part.type === "function_call" || - part.type === "function_call_output" - ) - .map((part: any) => { - if (part.type === "function_call") { - return `[Tool Call: ${part.name}(${JSON.stringify(part.arguments)})]`; - } else if (part.type === "function_call_output") { - return `[Tool Result: ${part.output}]`; - } - return ""; - }); - - const allParts = [...textParts, ...toolParts].filter((p) => p.length > 0); - return { ...item, content: allParts.join("\n") }; - } - return item; - }); - - const agentParams = { input: userInput, chat_history: chatHistory }; - - // Streaming response: write each ResponseStreamEvent directly as SSE - if (stream) { - res.setHeader("Content-Type", "text/event-stream"); - res.setHeader("Cache-Control", "no-cache"); - res.setHeader("Connection", "keep-alive"); - - try { - for await (const event of agent.stream(agentParams)) { - res.write(`data: ${JSON.stringify(event)}\n\n`); - } - res.write("data: [DONE]\n\n"); - res.end(); - } catch (error: unknown) { - const message = error instanceof Error ? error.message : String(error); - console.error("Streaming error:", error); - res.write(`data: ${JSON.stringify({ type: "error", error: message })}\n\n`); - res.write(`data: ${JSON.stringify({ type: "response.failed" })}\n\n`); - res.write("data: [DONE]\n\n"); - res.end(); - } - } else { - // Non-streaming response: return output items directly - const items = await agent.invoke(agentParams); - res.json({ output: items }); - } - } catch (error: unknown) { - const message = error instanceof Error ? error.message : String(error); - console.error("Agent invocation error:", error); - res.status(500).json({ - error: "Internal server error", - message, - }); - } - }); - - return router; -} diff --git a/agent-langchain-ts/src/framework/server.ts b/agent-langchain-ts/src/framework/server.ts deleted file mode 100644 index cb9ec0a3..00000000 --- a/agent-langchain-ts/src/framework/server.ts +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Express server for the LangChain agent with MLflow tracing. - * - * Provides: - * - /invocations endpoint (MLflow-compatible Responses API) - * - Health check endpoint - * - MLflow trace export via OpenTelemetry - * - * Note: This server is UI-agnostic. The UI (e2e-chatbot-app-next) runs separately - * and proxies to /invocations via the API_PROXY environment variable. - */ - -import express, { Request, Response } from "express"; -import cors from "cors"; -import { config } from "dotenv"; -import path from "path"; -import { fileURLToPath } from "url"; -import { existsSync } from "fs"; -import { - initializeMLflowTracing, - type MLflowTracing, -} from "./tracing.js"; -import { createInvocationsRouter } from "./routes/invocations.js"; -import { closeMCPClient } from "../tools.js"; -import type { AgentInterface } from "./agent-interface.js"; - -// Load environment variables -config(); - -/** - * Server configuration - */ -interface ServerConfig { - port: number; -} - -const SERVICE_INFO = { - service: "LangChain Agent TypeScript", - version: "1.0.0", - endpoints: { - health: "GET /health", - invocations: "POST /invocations (Responses API)", - }, -}; - -/** - * Register SIGINT/SIGTERM handlers that flush tracing and close MCP connections. - */ -function setupShutdownHandlers(tracing: MLflowTracing): void { - const shutdown = async (signal: string) => { - console.log(`\nReceived ${signal}, shutting down...`); - try { - await closeMCPClient(); - await tracing.flush(); - await tracing.shutdown(); - process.exit(0); - } catch (error) { - console.error("Error during shutdown:", error); - process.exit(1); - } - }; - - process.on("SIGINT", () => shutdown("SIGINT")); - process.on("SIGTERM", () => shutdown("SIGTERM")); - process.on("beforeExit", () => tracing.flush()); -} - -/** - * Initialize the Express server - */ -export async function createServer( - agent: AgentInterface, - serverConfig: ServerConfig -): Promise { - const app = express(); - - // Middleware - app.use(cors()); - app.use(express.json({ limit: '10mb' })); // Protect against large payload DoS - - // Initialize MLflow tracing - const tracing = await initializeMLflowTracing({ - serviceName: "langchain-agent-ts", - experimentId: process.env.MLFLOW_EXPERIMENT_ID, - }); - - setupShutdownHandlers(tracing); - - /** - * Health check endpoint - */ - app.get("/health", (_req: Request, res: Response) => { - res.json({ - status: "healthy", - timestamp: new Date().toISOString(), - service: "langchain-agent-ts", - }); - }); - - // Mount handler at /invocations (MLflow) and /responses (OpenAI SDK compatibility) - const invocationsRouter = createInvocationsRouter(agent); - app.use("/invocations", invocationsRouter); - app.use("/responses", invocationsRouter); - - console.log("✅ Agent endpoints mounted (/invocations, /responses)"); - - // Production UI serving (optional - only if UI is deployed) - const uiBackendUrl = process.env.UI_BACKEND_URL; - if (uiBackendUrl) { - console.log(`🔗 Proxying /api/* to UI backend: ${uiBackendUrl}`); - - // Proxy /api/* routes to UI backend server - app.use("/api/*path", async (req, res) => { - try { - const targetUrl = `${uiBackendUrl}${req.originalUrl}`; - const response = await fetch(targetUrl, { - method: req.method, - headers: req.headers as Record, - body: req.method !== "GET" && req.method !== "HEAD" ? JSON.stringify(req.body) : undefined, - }); - - // Copy response headers - response.headers.forEach((value, key) => { - res.setHeader(key, value); - }); - - res.status(response.status); - - // Stream response body - if (response.body) { - const reader = response.body.getReader(); - while (true) { - const { done, value } = await reader.read(); - if (done) break; - res.write(value); - } - } - res.end(); - } catch (error) { - console.error("Error proxying to UI backend:", error); - res.status(502).json({ error: "Bad Gateway" }); - } - }); - - // Serve UI static files from ui/client/dist - const __filename = fileURLToPath(import.meta.url); - const __dirname = path.dirname(__filename); - const uiDistPath = path.join(__dirname, "..", "..", "..", "ui", "client", "dist"); - - if (existsSync(uiDistPath)) { - console.log(`📂 Serving UI static files from: ${uiDistPath}`); - app.use(express.static(uiDistPath)); - - // SPA fallback - serve index.html for all non-API routes - app.get("*path", (_req: Request, res: Response) => { - res.sendFile(path.join(uiDistPath, "index.html")); - }); - } else { - console.warn(`⚠️ UI dist path not found: ${uiDistPath}`); - app.get("/", (_req: Request, res: Response) => { res.json(SERVICE_INFO); }); - } - } else { - // Agent-only mode: service info at root - app.get("/", (_req: Request, res: Response) => { res.json(SERVICE_INFO); }); - } - - return app; -} - -/** - * Start the server - */ -export async function startServer(agent: AgentInterface, config?: { port?: number }) { - const port = config?.port ?? parseInt(process.env.PORT || "8000", 10); - - const app = await createServer(agent, { port }); - - app.listen(port, () => { - console.log(`\n🚀 Agent Server running on http://localhost:${port}`); - console.log(` Health: http://localhost:${port}/health`); - console.log(` Invocations API: http://localhost:${port}/invocations`); - console.log(`\n📊 MLflow tracking enabled`); - console.log(` Experiment: ${process.env.MLFLOW_EXPERIMENT_ID || "default"}`); - }); -} diff --git a/agent-langchain-ts/src/framework/tracing.ts b/agent-langchain-ts/src/framework/tracing.ts deleted file mode 100644 index b2bffa6f..00000000 --- a/agent-langchain-ts/src/framework/tracing.ts +++ /dev/null @@ -1,393 +0,0 @@ -/** - * MLflow tracing setup using OpenTelemetry for LangChain instrumentation. - * - * This module configures automatic trace export to MLflow, capturing: - * - LangChain operations (LLM calls, tool invocations, chain executions) - * - Span timing and hierarchy - * - Input/output data - * - Metadata and attributes - */ - -import { - NodeTracerProvider, - SimpleSpanProcessor, - BatchSpanProcessor, -} from "@opentelemetry/sdk-trace-node"; -import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto"; -import { LangChainInstrumentation } from "@arizeai/openinference-instrumentation-langchain"; -import * as CallbackManagerModule from "@langchain/core/callbacks/manager"; -import { Resource } from "@opentelemetry/resources"; -import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions"; -import { WorkspaceClient } from "@databricks/sdk-experimental"; - -export interface TracingConfig { - /** MLflow tracking URI (defaults to "databricks") */ - mlflowTrackingUri?: string; - - /** MLflow experiment ID to associate traces with */ - experimentId?: string; - - /** - * MLflow run ID to nest traces under (optional) - */ - runId?: string; - - /** - * Service name for trace identification - */ - serviceName?: string; - - /** - * Whether to use batch or simple span processor - * Batch is more efficient for production, simple is better for debugging - */ - useBatchProcessor?: boolean; -} - -export class MLflowTracing { - private provider: NodeTracerProvider; - private exporter!: OTLPTraceExporter; // Will be initialized in initialize() - private isInitialized = false; - private databricksClient?: WorkspaceClient; - - constructor(private config: TracingConfig = {}) { - // Set defaults - this.config.mlflowTrackingUri = config.mlflowTrackingUri || - process.env.MLFLOW_TRACKING_URI || - "databricks"; - this.config.experimentId = config.experimentId || - process.env.MLFLOW_EXPERIMENT_ID; - this.config.runId = config.runId || - process.env.MLFLOW_RUN_ID; - this.config.serviceName = config.serviceName || - "langchain-agent-ts"; - this.config.useBatchProcessor = config.useBatchProcessor ?? (process.env.OTEL_USE_BATCH_PROCESSOR !== "false"); - - // Note: Exporter will be created in initialize() after fetching auth token - this.provider = new NodeTracerProvider({ - resource: new Resource({ - [ATTR_SERVICE_NAME]: this.config.serviceName, - }), - }); - } - - /** - * Normalize host URL by adding https:// if needed - */ - private normalizeHost(host: string): string { - if (!host.startsWith("http://") && !host.startsWith("https://")) { - return `https://${host}`; - } - return host; - } - - /** - * Build MLflow trace endpoint URL - * Uses Databricks OTel collector endpoints (preview feature) - */ - private buildTraceUrl(): string { - const baseUri = this.config.mlflowTrackingUri; - - // Databricks workspace tracking - if (baseUri === "databricks") { - const rawHost = process.env.DATABRICKS_HOST; - if (!rawHost) { - throw new Error( - "DATABRICKS_HOST environment variable required when using 'databricks' tracking URI" - ); - } - const host = this.normalizeHost(rawHost); - return `${host.replace(/\/$/, "")}/api/2.0/otel/v1/traces`; - } - - // Local or custom MLflow server - return `${baseUri}/v1/traces`; - } - - - /** - * Link experiment to existing UC trace location - * This only requires the catalog/schema to exist, not a warehouse - */ - private async linkExperimentToLocation( - catalogName: string, - schemaName: string, - tableName: string - ): Promise { - if (!this.config.experimentId || !this.databricksClient) { - return null; - } - - try { - await this.databricksClient.apiClient.request({ - path: `/api/4.0/mlflow/traces/${this.config.experimentId}/link-location`, - method: "POST", - headers: new Headers({ "Content-Type": "application/json" }), - payload: { - experiment_id: this.config.experimentId, - uc_schema: { - catalog_name: catalogName, - schema_name: schemaName, - }, - }, - raw: false, - }); - - console.log(`✅ Experiment linked to UC trace location: ${tableName}`); - return tableName; - - } catch (error) { - console.warn(`⚠️ Error linking experiment to trace location:`, error); - return null; - } - } - - /** - * Set up experiment trace location in Unity Catalog - * Creates UC storage location and links experiment to it - * - * This implements the MLflow set_experiment_trace_location() API in TypeScript - */ - private async setupExperimentTraceLocation(): Promise { - if (!this.config.experimentId || !this.databricksClient) { - return null; - } - - const catalogName = process.env.OTEL_UC_CATALOG || "main"; - const schemaName = process.env.OTEL_UC_SCHEMA || "agent_traces"; - const warehouseId = process.env.MLFLOW_TRACING_SQL_WAREHOUSE_ID; - const tableName = `${catalogName}.${schemaName}.mlflow_experiment_trace_otel_spans`; - - // If no warehouse is specified, try to link directly (works if table already exists) - if (!warehouseId) { - console.log(`⚠️ MLFLOW_TRACING_SQL_WAREHOUSE_ID not set, attempting to link to existing table: ${tableName}`); - return await this.linkExperimentToLocation(catalogName, schemaName, tableName); - } - - try { - console.log(`🔗 Setting up trace location: ${catalogName}.${schemaName}`); - - // Step 1: Create UC storage location - await this.databricksClient.apiClient.request({ - path: "/api/4.0/mlflow/traces/location", - method: "POST", - headers: new Headers({ "Content-Type": "application/json" }), - payload: { - uc_schema: { - catalog_name: catalogName, - schema_name: schemaName, - }, - sql_warehouse_id: warehouseId, - }, - raw: false, - }); - - return await this.linkExperimentToLocation(catalogName, schemaName, tableName); - - } catch (error: any) { - // 409 means location already exists, which is fine - if (error?.message?.includes("409")) { - return await this.linkExperimentToLocation(catalogName, schemaName, tableName); - } - console.warn(`⚠️ Error setting up trace location:`, error); - return null; - } - } - - /** - * Build headers for trace export using SDK authentication - * Includes required headers for Databricks OTel collector - */ - private async buildHeadersWithToken(): Promise> { - const headers: Record = {}; - - // Get authentication headers from SDK - if (this.databricksClient) { - const authHeaders = new Headers(); - await this.databricksClient.config.authenticate(authHeaders); - - // Convert Headers to plain object - authHeaders.forEach((value, key) => { - headers[key] = value; - }); - } else if (this.config.mlflowTrackingUri === "databricks") { - console.warn( - "⚠️ No Databricks client available for trace export. Traces may not be exported." - ); - } - - // Required for Databricks OTel collector - if (this.config.mlflowTrackingUri === "databricks") { - headers["content-type"] = "application/x-protobuf"; - - // Unity Catalog table name for trace storage - const ucTableName = process.env.OTEL_UC_TABLE_NAME; - if (ucTableName) { - headers["X-Databricks-UC-Table-Name"] = ucTableName; - console.log(`📊 Traces will be stored in UC table: ${ucTableName}`); - } else { - console.warn( - "⚠️ OTEL_UC_TABLE_NAME not set. You need to:\n" + - " 1. Enable OTel collector preview in your workspace\n" + - " 2. Create UC tables for trace storage\n" + - " 3. Set OTEL_UC_TABLE_NAME=.._otel_spans" - ); - } - } - - // Add experiment ID if provided - if (this.config.experimentId) { - headers["x-mlflow-experiment-id"] = this.config.experimentId; - } - - // Add run ID if provided - if (this.config.runId) { - headers["x-mlflow-run-id"] = this.config.runId; - } - - return headers; - } - - /** - * Initialize tracing - registers the tracer provider and instruments LangChain - */ - async initialize(): Promise { - if (this.isInitialized) { - console.warn("MLflow tracing already initialized"); - return; - } - - // No-op mode: skip all tracing setup (used in tests) - if (this.config.mlflowTrackingUri === "noop") { - this.isInitialized = true; - console.log("⏭️ MLflow tracing disabled (MLFLOW_TRACKING_URI=noop)"); - return; - } - - // Initialize Databricks SDK client for authentication - if (this.config.mlflowTrackingUri === "databricks") { - console.log("🔐 Initializing Databricks SDK authentication..."); - - try { - // Create WorkspaceClient - automatically handles auth chain: - // 1. Databricks Native (PAT, OAuth M2M, OAuth U2M) - // 2. Azure Native (Azure CLI, MSI, Client Secret) - // 3. GCP Native (GCP credentials, default application credentials) - // 4. Databricks CLI profile - this.databricksClient = new WorkspaceClient({ - profile: process.env.DATABRICKS_CONFIG_PROFILE, - host: process.env.DATABRICKS_HOST, - token: process.env.DATABRICKS_TOKEN, - clientId: process.env.DATABRICKS_CLIENT_ID, - clientSecret: process.env.DATABRICKS_CLIENT_SECRET, - }); - - // Verify authentication works by getting config - await this.databricksClient.config.ensureResolved(); - console.log("✅ Databricks SDK authentication successful"); - - // Set up experiment trace location in UC (if not already configured) - if (!process.env.OTEL_UC_TABLE_NAME) { - const tableName = await this.setupExperimentTraceLocation(); - if (tableName) { - // Set environment variable so buildHeadersWithToken() can use it - process.env.OTEL_UC_TABLE_NAME = tableName; - } - } - } catch (error) { - console.warn("⚠️ Failed to initialize Databricks SDK authentication:", error); - console.warn("⚠️ Traces may not be exported without authentication"); - } - } - - // Build headers with SDK authentication - const headers = await this.buildHeadersWithToken(); - - // Construct trace endpoint URL - const traceUrl = this.buildTraceUrl(); - - // Log detailed export configuration for debugging - console.log("🔍 OTel Export Configuration:"); - console.log(" URL:", traceUrl); - console.log(" Headers:", Object.keys(headers).join(", ")); - // Check for both lowercase and capitalized Authorization header - const hasAuth = headers["Authorization"] || headers["authorization"]; - console.log(" Auth:", hasAuth ? "Present (Bearer token)" : "Missing"); - console.log(" Content-Type:", headers["content-type"]); - console.log(" UC Table:", headers["X-Databricks-UC-Table-Name"] || "Not set"); - console.log(" Experiment ID:", headers["x-mlflow-experiment-id"] || "Not set"); - - // Create OTLP exporter with headers - this.exporter = new OTLPTraceExporter({ - url: traceUrl, - headers, - timeoutMillis: 30000, - }); - - // Add span processor with error handling - const processor = this.config.useBatchProcessor - ? new BatchSpanProcessor(this.exporter) - : new SimpleSpanProcessor(this.exporter); - - this.provider.addSpanProcessor(processor); - - // Register the tracer provider globally - this.provider.register(); - - // Instrument LangChain callbacks to emit traces - new LangChainInstrumentation().manuallyInstrument(CallbackManagerModule); - - this.isInitialized = true; - - console.log("✅ MLflow tracing initialized", { - serviceName: this.config.serviceName, - experimentId: this.config.experimentId, - trackingUri: this.config.mlflowTrackingUri, - hasAuthClient: !!this.databricksClient, - }); - } - - /** - * Shutdown tracing gracefully - flushes pending spans - */ - async shutdown(): Promise { - if (!this.isInitialized) { - return; - } - - try { - await this.provider.shutdown(); - console.log("✅ MLflow tracing shutdown complete"); - } catch (error) { - console.error("Error shutting down tracing:", error); - throw error; - } - } - - /** - * Force flush pending spans (useful before process exit) - */ - async flush(): Promise { - if (!this.isInitialized) { - return; - } - - try { - await this.provider.forceFlush(); - } catch (error) { - console.error("Error flushing traces:", error); - throw error; - } - } -} - -/** - * Initialize MLflow tracing with default configuration - * Call this once at application startup - */ -export async function initializeMLflowTracing(config?: TracingConfig): Promise { - const tracing = new MLflowTracing(config); - await tracing.initialize(); - return tracing; -} - diff --git a/agent-langchain-ts/src/main.ts b/agent-langchain-ts/src/main.ts index d52483aa..d4db07d2 100644 --- a/agent-langchain-ts/src/main.ts +++ b/agent-langchain-ts/src/main.ts @@ -1,19 +1,31 @@ import { config } from "dotenv"; config(); -import { createAgent } from "./agent.js"; +import { createApp, agent, server } from "@databricks/appkit"; +import { basicTools } from "./tools.js"; import { getMCPServers } from "./mcp-servers.js"; -import { startServer } from "./framework/server.js"; -const agent = await createAgent({ - model: process.env.DATABRICKS_MODEL || "databricks-claude-sonnet-4-5", - temperature: parseFloat(process.env.TEMPERATURE || "0.1"), - maxTokens: parseInt(process.env.MAX_TOKENS || "2000", 10), - useResponsesApi: process.env.USE_RESPONSES_API === "true", - mcpServers: getMCPServers(), +const app = await createApp({ + plugins: [ + agent({ + model: process.env.DATABRICKS_MODEL || "databricks-claude-sonnet-4-5", + useResponsesApi: process.env.USE_RESPONSES_API === "true", + temperature: parseFloat(process.env.TEMPERATURE || "0.1"), + maxTokens: parseInt(process.env.MAX_TOKENS || "2000", 10), + mcpServers: getMCPServers(), + tools: basicTools, + traceDestination: { type: "mlflow" }, + }), + server({ + autoStart: false, + }), + ], }); -startServer(agent).catch((error) => { - console.error("❌ Failed to start server:", error); - process.exit(1); +// Databricks Apps platform expects /invocations at root +app.server.extend((expressApp) => { + expressApp.post("/invocations", (req, res) => res.redirect(307, "/api/agent")); + expressApp.post("/responses", (req, res) => res.redirect(307, "/api/agent")); }); + +await app.server.start(); diff --git a/agent-langchain-ts/tests/agent.test.ts b/agent-langchain-ts/tests/agent.test.ts index d36375a0..4700ba66 100644 --- a/agent-langchain-ts/tests/agent.test.ts +++ b/agent-langchain-ts/tests/agent.test.ts @@ -1,90 +1,75 @@ /** - * Tests for the LangChain agent — USER CODE SPACE + * Tests for the agent via AppKit. * - * These tests exercise src/agent.ts and src/tools.ts directly. - * When you modify your agent (change tools, swap the model, update the system - * prompt), update these tests to match. They are intentionally NOT part of the - * framework test suite so that framework tests stay decoupled from user code. + * These tests exercise the agent plugin with the template's tools. + * They require a live Databricks model endpoint (DATABRICKS_MODEL env var). */ import { describe, test, expect, beforeAll } from "@jest/globals"; -import type { ResponseStreamEvent } from "openai/resources/responses/responses.js"; -import { createAgent } from "../src/agent.js"; -import { getOutput } from "./helpers.js"; +import { createApp, agent, server } from "@databricks/appkit"; +import type { AgentInterface, ResponseStreamEvent } from "@databricks/appkit"; +import { basicTools } from "../src/tools.js"; describe("Agent", () => { - let agent: Awaited>; + let app: Awaited>; beforeAll(async () => { - // Create agent with basic tools only (no MCP for tests) - agent = await createAgent({ - model: process.env.DATABRICKS_MODEL || "databricks-claude-sonnet-4-5", - temperature: 0, + app = await createApp({ + plugins: [ + agent({ + model: process.env.DATABRICKS_MODEL || "databricks-claude-sonnet-4-5", + temperature: 0, + tools: basicTools, + }), + server({ autoStart: false }), + ], }); }); test("should initialize agent successfully", () => { - expect(agent).toBeDefined(); + expect(app.agent).toBeDefined(); }); test("should respond to simple queries", async () => { - const result = await agent.invoke({ - input: "Hello, how are you?", - }); + const result = await app.agent.invoke([ + { role: "user", content: "Hello, how are you?" }, + ]); - expect(result).toBeDefined(); - const output = getOutput(result); - expect(output).toBeTruthy(); - expect(typeof output).toBe("string"); + expect(result).toBeTruthy(); + expect(typeof result).toBe("string"); + expect(result.length).toBeGreaterThan(0); }, 30000); test("should use time tool", async () => { - const result = await agent.invoke({ - input: "What time is it in Tokyo?", - }); - - expect(result).toBeDefined(); - const output = getOutput(result); - expect(output).toBeTruthy(); + const result = await app.agent.invoke([ + { role: "user", content: "What time is it in Tokyo?" }, + ]); - // Verify time tool was used by checking output mentions time + expect(result).toBeTruthy(); const mentionsTime = - output.toLowerCase().includes("time") || - /\d{1,2}:\d{2}/.test(output) || // Matches HH:MM format - output.toLowerCase().includes("tokyo"); + result.toLowerCase().includes("time") || + /\d{1,2}:\d{2}/.test(result) || + result.toLowerCase().includes("tokyo"); expect(mentionsTime).toBe(true); }, 30000); - test("should handle multi-turn conversations", async () => { - const firstResult = await agent.invoke({ - input: "My name is Alice.", - chat_history: [], - }); - - const firstOutput = getOutput(firstResult); - expect(firstOutput).toBeTruthy(); - - const secondResult = await agent.invoke({ - input: "What is my name?", - chat_history: [ - { role: "user", content: "My name is Alice." }, - { role: "assistant", content: firstOutput }, - ], - }); - - const secondOutput = getOutput(secondResult); - expect(secondOutput.toLowerCase()).toContain("alice"); - }, 60000); - - test("should emit function_call event when calling a tool", async () => { + test("should stream responses", async () => { const events: ResponseStreamEvent[] = []; - for await (const event of agent.stream({ input: "What time is it in Tokyo?" })) { + for await (const event of app.agent.stream([ + { role: "user", content: "Say hello briefly" }, + ])) { events.push(event as ResponseStreamEvent); } - const toolCallEvent = events.find( - (e) => e.type === "response.output_item.done" && (e as any).item?.type === "function_call" + + expect(events.length).toBeGreaterThan(0); + const hasDelta = events.some( + (e) => e.type === "response.output_text.delta", + ); + expect(hasDelta).toBe(true); + + const hasCompleted = events.some( + (e) => e.type === "response.completed", ); - expect(toolCallEvent).toBeDefined(); - expect((toolCallEvent as any).item.name).toBe("get_current_time"); + expect(hasCompleted).toBe(true); }, 30000); }); diff --git a/agent-langchain-ts/tests/framework/endpoints.test.ts b/agent-langchain-ts/tests/framework/endpoints.test.ts deleted file mode 100644 index 23de542b..00000000 --- a/agent-langchain-ts/tests/framework/endpoints.test.ts +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Integration tests for API endpoints - * Tests both /invocations (Responses API) and /api/chat (AI SDK + useChat) - */ - -import { describe, test, expect, beforeAll, afterAll } from "@jest/globals"; -import { spawn } from "child_process"; -import type { ChildProcess } from "child_process"; -import OpenAI from "openai"; - -describe("API Endpoints", () => { - let agentProcess: ChildProcess; - const PORT = 5555; // Use different port to avoid conflicts - const BASE_URL = `http://localhost:${PORT}`; - let client: OpenAI; - - beforeAll(async () => { - // Start framework server with stub agent (no LLM required) - agentProcess = spawn("node_modules/.bin/tsx", ["tests/framework/stub-server.ts"], { - env: { ...process.env, PORT: PORT.toString(), MLFLOW_TRACKING_URI: "noop" }, - stdio: ["ignore", "pipe", "pipe"], - }); - - // Poll /health until server is ready (max 20s) - const start = Date.now(); - while (Date.now() - start < 20000) { - try { - const r = await fetch(`${BASE_URL}/health`); - if (r.ok) break; - } catch {} - await new Promise((r) => setTimeout(r, 200)); - } - - client = new OpenAI({ baseURL: BASE_URL, apiKey: "not-needed" }); - }, 30000); - - afterAll(async () => { - if (agentProcess) { - agentProcess.kill(); - } - }); - - describe("/invocations endpoint", () => { - test("should respond with Responses API format", async () => { - const stream = await client.responses.create({ - model: "test-model", - input: [{ role: "user", content: "Say 'test' and nothing else" }], - stream: true, - }); - - let fullText = ""; - let hasTextDelta = false; - let hasCompleted = false; - - for await (const event of stream) { - if (event.type === "response.output_text.delta") { - fullText += event.delta; - hasTextDelta = true; - } - if (event.type === "response.completed") { - hasCompleted = true; - } - } - - expect(hasTextDelta).toBe(true); - expect(hasCompleted).toBe(true); - }, 30000); - - test("should work via /responses alias", async () => { - // The /responses alias allows the OpenAI SDK to use the endpoint natively - const stream = await client.responses.create({ - model: "test-model", - input: [{ role: "user", content: "Say 'SDK test'" }], - stream: true, - }); - - let hasTextDelta = false; - - for await (const event of stream) { - if (event.type === "response.output_text.delta") { - hasTextDelta = true; - } - } - - expect(hasTextDelta).toBe(true); - }, 30000); - - }); -}); diff --git a/agent-langchain-ts/tests/framework/error-handling.test.ts b/agent-langchain-ts/tests/framework/error-handling.test.ts deleted file mode 100644 index dc5fa2fa..00000000 --- a/agent-langchain-ts/tests/framework/error-handling.test.ts +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Error handling tests for agent endpoints - * Tests error scenarios including SSE completion and request validation - * - * Run with: npm run test:integration - */ - -import { describe, test, expect, beforeAll, afterAll } from '@jest/globals'; -import { spawn } from "child_process"; -import type { ChildProcess } from "child_process"; -import { - parseSSEStream, - assertStreamComplete, -} from '../helpers.js'; - -const PORT = 5558; -const AGENT_URL = `http://localhost:${PORT}`; -let agentProcess: ChildProcess; - -beforeAll(async () => { - agentProcess = spawn("node_modules/.bin/tsx", ["tests/framework/stub-server.ts"], { - env: { ...process.env, PORT: PORT.toString(), MLFLOW_TRACKING_URI: "noop" }, - stdio: ["ignore", "pipe", "pipe"], - }); - const start = Date.now(); - while (Date.now() - start < 20000) { - try { - const r = await fetch(`${AGENT_URL}/health`); - if (r.ok) break; - } catch {} - await new Promise((r) => setTimeout(r, 200)); - } -}, 30000); - -afterAll(() => { - if (agentProcess) agentProcess.kill(); -}); - -describe("Error Handling Tests", () => { - describe("SSE Stream Completion", () => { - test("should send completion events on successful response", async () => { - const response = await fetch(`${AGENT_URL}/invocations`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - input: [{ role: "user", content: "Say 'test'" }], - stream: true, - }), - }); - - expect(response.ok).toBe(true); - const text = await response.text(); - const { events } = parseSSEStream(text); - - // Verify proper SSE completion sequence - assertStreamComplete(text); - expect(events.some(e => e.type === "response.completed" || e.type === "response.failed")).toBe(true); - - // Ensure it ends with [DONE] - const lines = text.trim().split("\n"); - const lastDataLine = lines - .filter(line => line.startsWith("data:")) - .pop(); - expect(lastDataLine).toBe("data: [DONE]"); - }, 30000); - - test("should handle malformed input gracefully", async () => { - const response = await fetch(`${AGENT_URL}/invocations`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - // Missing required 'input' field - stream: true, - }), - }); - - // Should return error status - expect(response.ok).toBe(false); - expect(response.status).toBe(400); - }, 30000); - }); - - describe("Request Size Limits", () => { - test("should reject payloads exceeding 10MB limit", async () => { - // Create a payload larger than 10MB - const largeMessage = "A".repeat(11 * 1024 * 1024); // 11MB - - const response = await fetch(`${AGENT_URL}/invocations`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - input: [ - { role: "user", content: largeMessage } - ], - stream: true, - }), - }); - - // Should reject with 413 (Payload Too Large) - expect(response.ok).toBe(false); - expect(response.status).toBe(413); - }, 30000); - - test("should accept payloads under 10MB limit", async () => { - // Create a payload just under 10MB - const acceptableMessage = "A".repeat(1024 * 1024); // 1MB - - const response = await fetch(`${AGENT_URL}/invocations`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - input: [ - { role: "user", content: acceptableMessage } - ], - stream: true, - }), - }); - - // Should accept and process - expect(response.ok).toBe(true); - }, 30000); - }); - - describe("Stream Robustness", () => { - test("should handle complex requests without hanging", async () => { - // Test with a complex request - // Critical behavior: stream must complete (not hang) - const response = await fetch(`${AGENT_URL}/invocations`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - input: [{ - role: "user", - content: "Tell me about weather, time, and calculations" - }], - stream: true, - }), - }); - - expect(response.ok).toBe(true); - const text = await response.text(); - - // Stream must complete - this is the critical behavior - assertStreamComplete(text); - }, 30000); - }); - -}); diff --git a/agent-langchain-ts/tests/framework/followup-questions.test.ts b/agent-langchain-ts/tests/framework/followup-questions.test.ts deleted file mode 100644 index d0d3b8a5..00000000 --- a/agent-langchain-ts/tests/framework/followup-questions.test.ts +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Multi-turn conversation tests for /invocations - * Verifies that conversation history (including tool calls) is handled correctly - * - * Run with: npm run test:integration - */ - -import { describe, test, expect, beforeAll, afterAll } from "@jest/globals"; -import { spawn } from "child_process"; -import type { ChildProcess } from "child_process"; -import { parseSSEStream } from "../helpers.js"; - -const PORT = 5559; -const AGENT_URL = `http://localhost:${PORT}`; -let agentProcess: ChildProcess; - -beforeAll(async () => { - agentProcess = spawn("node_modules/.bin/tsx", ["tests/framework/stub-server.ts"], { - env: { ...process.env, PORT: PORT.toString(), MLFLOW_TRACKING_URI: "noop" }, - stdio: ["ignore", "pipe", "pipe"], - }); - const start = Date.now(); - while (Date.now() - start < 20000) { - try { - const r = await fetch(`${AGENT_URL}/health`); - if (r.ok) break; - } catch {} - await new Promise((r) => setTimeout(r, 200)); - } -}, 30000); - -afterAll(() => { - if (agentProcess) agentProcess.kill(); -}); - -describe("Multi-turn conversations - /invocations", () => { - test("should handle simple followup question with context", async () => { - const response = await fetch(`${AGENT_URL}/invocations`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - input: [ - { role: "user", content: "My favorite color is blue" }, - { role: "assistant", content: "I'll remember that your favorite color is blue." }, - { role: "user", content: "What is my favorite color?" }, - ], - stream: true, - }), - }); - - expect(response.ok).toBe(true); - const text = await response.text(); - const { fullOutput, events } = parseSSEStream(text); - expect(events.some((e) => e.type === "response.output_text.delta")).toBe(true); - expect(fullOutput).toContain("Echo:"); - }, 30000); - - test("should handle followup after tool call", async () => { - const response = await fetch(`${AGENT_URL}/invocations`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - input: [ - { role: "user", content: "Look up the answer for me" }, - { - role: "assistant", - content: [ - { - type: "function_call", - name: "some_tool", - arguments: '{"query":"test"}', - }, - { - type: "function_call_output", - output: '"The answer is 42"', - }, - { - type: "output_text", - text: "The answer is 42.", - }, - ], - }, - { role: "user", content: "What did you just tell me?" }, - ], - stream: true, - }), - }); - - expect(response.ok).toBe(true); - const text = await response.text(); - const { fullOutput } = parseSSEStream(text); - // The value of this test is that the framework parses function_call/function_call_output - // history items without crashing. The stub returning any response proves that. - expect(fullOutput.length).toBeGreaterThan(0); - expect(fullOutput).toContain("Echo:"); - }, 30000); - - test("should handle empty previous message history", async () => { - const response = await fetch(`${AGENT_URL}/invocations`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - input: [{ role: "user", content: "What did I just tell you?" }], - stream: true, - }), - }); - - expect(response.ok).toBe(true); - const text = await response.text(); - const { fullOutput } = parseSSEStream(text); - expect(fullOutput.length).toBeGreaterThan(0); - }, 30000); -}); diff --git a/agent-langchain-ts/tests/framework/integration.test.ts b/agent-langchain-ts/tests/framework/integration.test.ts deleted file mode 100644 index 667c8812..00000000 --- a/agent-langchain-ts/tests/framework/integration.test.ts +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Integration tests for local agent endpoints - * Tests both /invocations and /api/chat - * - * Spawns agent and UI servers automatically — no external setup required. - * - * Run with: npm test tests/integration.test.ts - */ - -import { describe, test, expect, beforeAll, afterAll } from '@jest/globals'; -import { config as loadEnv } from "dotenv"; -import { spawn } from "child_process"; -import type { ChildProcess } from "child_process"; -import { resolve } from "path"; - -// Load agent .env so credentials flow through to the UI server subprocess -loadEnv({ path: resolve(process.cwd(), ".env") }); -import { createDatabricksProvider } from "@databricks/ai-sdk-provider"; -import { streamText } from "ai"; -import { - parseAISDKStream, -} from '../helpers.js'; - -const AGENT_PORT = 5556; -const UI_PORT = 5557; -const AGENT_URL = `http://localhost:${AGENT_PORT}`; -const UI_URL = `http://localhost:${UI_PORT}`; - -describe("Integration Tests - Local Endpoints", () => { - let agentProcess: ChildProcess; - let uiProcess: ChildProcess; - - beforeAll(async () => { - // Start framework server with stub agent (no LLM required) - agentProcess = spawn("node_modules/.bin/tsx", ["tests/framework/stub-server.ts"], { - env: { ...process.env, PORT: AGENT_PORT.toString(), MLFLOW_TRACKING_URI: "noop" }, - stdio: ["ignore", "pipe", "pipe"], - }); - - // Start UI server with API_PROXY pointing to agent - uiProcess = spawn( - `${process.cwd()}/node_modules/.bin/tsx`, - ["server/src/index.ts"], - { - cwd: `${process.cwd()}/../e2e-chatbot-app-next`, - env: { - ...process.env, - CHAT_APP_PORT: UI_PORT.toString(), - API_PROXY: `${AGENT_URL}/invocations`, - }, - stdio: ["ignore", "pipe", "pipe"], - } - ); - - // Poll /health until agent server is ready (max 20s) - const agentStart = Date.now(); - while (Date.now() - agentStart < 20000) { - try { - const r = await fetch(`${AGENT_URL}/health`); - if (r.ok) break; - } catch {} - await new Promise((r) => setTimeout(r, 200)); - } - - // Poll /ping until UI server is ready (max 20s) - const uiStart = Date.now(); - while (Date.now() - uiStart < 20000) { - try { - const r = await fetch(`${UI_URL}/ping`); - if (r.ok) break; - } catch {} - await new Promise((r) => setTimeout(r, 200)); - } - }, 60000); - - afterAll(async () => { - if (agentProcess) agentProcess.kill(); - if (uiProcess) uiProcess.kill(); - }); - - describe("/invocations endpoint", () => { - test("should respond with Databricks provider", async () => { - const databricks = createDatabricksProvider({ - baseURL: AGENT_URL, - formatUrl: ({ baseUrl, path }) => { - if (path === "/responses") { - return `${baseUrl}/invocations`; - } - return `${baseUrl}${path}`; - }, - }); - - const result = streamText({ - model: databricks.responses("test-model"), - messages: [ - { role: "user", content: "Say exactly: Databricks provider test successful" }, - ], - }); - - let fullText = ""; - for await (const chunk of result.textStream) { - fullText += chunk; - } - - expect(fullText.length).toBeGreaterThan(0); - }, 30000); - }); - - describe("/api/chat endpoint", () => { - test("should respond with useChat format", async () => { - const response = await fetch(`${UI_URL}/api/chat`, { - method: "POST", - headers: { - "Content-Type": "application/json", - "X-Forwarded-User": "test-user", - "X-Forwarded-Email": "test@example.com", - }, - body: JSON.stringify({ - id: "550e8400-e29b-41d4-a716-446655440000", - message: { - role: "user", - parts: [{ type: "text", text: "Say hello" }], - id: "550e8400-e29b-41d4-a716-446655440001", - }, - selectedChatModel: "chat-model", - selectedVisibilityType: "private", - nextMessageId: "550e8400-e29b-41d4-a716-446655440002", - }), - }); - - expect(response.ok).toBe(true); - const text = await response.text(); - const { hasTextDelta } = parseAISDKStream(text); - - expect(hasTextDelta).toBe(true); - }, 30000); - }); -}); diff --git a/agent-langchain-ts/tests/framework/stub-agent.ts b/agent-langchain-ts/tests/framework/stub-agent.ts deleted file mode 100644 index d528fa87..00000000 --- a/agent-langchain-ts/tests/framework/stub-agent.ts +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Deterministic stub AgentInterface for framework tests. - * - * Echoes user input as "Echo: {input}" — no LLM or network required. - * Framework tests use this so they are decoupled from user agent code. - */ - -import { randomUUID } from "crypto"; -import type { - ResponseOutputItem, - ResponseOutputMessage, - ResponseStreamEvent, -} from "openai/resources/responses/responses.js"; -import type { AgentInterface, InvokeParams } from "../../src/framework/agent-interface.js"; - -export class StubAgent implements AgentInterface { - async invoke(params: InvokeParams): Promise { - const text = `Echo: ${params.input}`; - const message: ResponseOutputMessage = { - id: `msg_${randomUUID()}`, - type: "message", - role: "assistant", - status: "completed", - content: [{ type: "output_text", text, annotations: [] }], - }; - return [message]; - } - - async *stream(params: InvokeParams): AsyncGenerator { - const text = `Echo: ${params.input}`; - const itemId = `msg_${randomUUID()}`; - let seqNum = 0; - - const msgItem: ResponseOutputMessage = { - id: itemId, - type: "message", - role: "assistant", - status: "in_progress", - content: [], - }; - - yield { - type: "response.output_item.added", - item: msgItem, - output_index: 0, - sequence_number: seqNum++, - } as unknown as ResponseStreamEvent; - - yield { - type: "response.output_text.delta", - item_id: itemId, - output_index: 0, - content_index: 0, - delta: text, - logprobs: [], - sequence_number: seqNum++, - } as unknown as ResponseStreamEvent; - - yield { - type: "response.output_item.done", - item: { ...msgItem, status: "completed" }, - output_index: 0, - sequence_number: seqNum++, - } as unknown as ResponseStreamEvent; - - yield { - type: "response.completed", - sequence_number: seqNum++, - response: {} as any, - } as unknown as ResponseStreamEvent; - } -} diff --git a/agent-langchain-ts/tests/framework/stub-server.ts b/agent-langchain-ts/tests/framework/stub-server.ts deleted file mode 100644 index 1804866d..00000000 --- a/agent-langchain-ts/tests/framework/stub-server.ts +++ /dev/null @@ -1,4 +0,0 @@ -import { startServer } from "../../src/framework/server.js"; -import { StubAgent } from "./stub-agent.js"; - -startServer(new StubAgent()).catch((e) => { console.error(e); process.exit(1); }); diff --git a/agent-langchain-ts/tests/helpers.ts b/agent-langchain-ts/tests/helpers.ts index c93b6790..054f6cbd 100644 --- a/agent-langchain-ts/tests/helpers.ts +++ b/agent-langchain-ts/tests/helpers.ts @@ -1,12 +1,7 @@ /** * Common test utilities and helpers - * Reduces duplication across test files */ -// ============================================================================ -// Configuration -// ============================================================================ - export const TEST_CONFIG = { AGENT_URL: process.env.AGENT_URL || "http://localhost:5001", UI_URL: process.env.UI_URL || "http://localhost:3001", @@ -14,24 +9,13 @@ export const TEST_CONFIG = { DEFAULT_TIMEOUT: 30000, } as const; -// ============================================================================ -// Request Helpers -// ============================================================================ - -/** - * Create authorization headers with Bearer token - */ export function makeAuthHeaders(token: string): Record { return { "Content-Type": "application/json", - "Authorization": `Bearer ${token}`, + Authorization: `Bearer ${token}`, }; } -// ============================================================================ -// SSE Stream Parsing -// ============================================================================ - export interface SSEEvent { type: string; [key: string]: any; @@ -45,9 +29,6 @@ export interface ParsedSSEStream { toolCalls: Array<{ name: string; arguments: any }>; } -/** - * Parse Server-Sent Events (SSE) stream from response - */ export function parseSSEStream(text: string): ParsedSSEStream { const events: SSEEvent[] = []; let fullOutput = ""; @@ -62,17 +43,12 @@ export function parseSSEStream(text: string): ParsedSSEStream { const data = JSON.parse(line.slice(6)); events.push(data); - // Extract text deltas if (data.type === "response.output_text.delta") { fullOutput += data.delta; } - - // Track errors if (data.type === "error" || data.type === "response.failed") { hasError = true; } - - // Track tool calls if ( data.type === "response.output_item.done" && data.item?.type === "function_call" @@ -83,62 +59,22 @@ export function parseSSEStream(text: string): ParsedSSEStream { arguments: JSON.parse(data.item.arguments || "{}"), }); } - } catch { - // Skip invalid JSON - } + } catch {} } } - return { events, fullOutput, hasError, hasToolCall, toolCalls }; } -/** - * Parse AI SDK streaming format (used by /api/chat) - */ -export function parseAISDKStream(text: string): { - fullContent: string; - hasTextDelta: boolean; - hasToolCall: boolean; -} { - let fullContent = ""; - let hasTextDelta = false; - let hasToolCall = false; - - const lines = text.split("\n").filter((line) => line.trim()); - - for (const line of lines) { - if (line.startsWith("data: ")) { - try { - const data = JSON.parse(line.slice(6)); - if (data.type === "text-delta") { - fullContent += data.delta; - hasTextDelta = true; - } - if (data.type === "tool-input-available") { - hasToolCall = true; - } - } catch { - // Skip invalid JSON - } - } +export function assertStreamComplete(text: string): void { + if (!text.includes("data: [DONE]")) { + throw new Error("Stream did not complete — missing data: [DONE]"); } - - return { fullContent, hasTextDelta, hasToolCall }; } -// ============================================================================ -// Authentication Helpers -// ============================================================================ - import { exec } from "child_process"; import { promisify } from "util"; - const execAsync = promisify(exec); -/** - * Get OAuth token for deployed app testing (async version) - * Use in beforeAll() hooks for test suites - */ export async function getDeployedAuthToken(): Promise { try { const profile = process.env.DATABRICKS_CLI_PROFILE; @@ -150,32 +86,3 @@ export async function getDeployedAuthToken(): Promise { throw new Error(`Failed to get auth token: ${error}`); } } - - -// ============================================================================ -// Assertion Helpers -// ============================================================================ - -/** - * Assert that an SSE stream completed (contains "data: [DONE]") - */ -export function assertStreamComplete(text: string): void { - expect(text).toContain("data: [DONE]"); -} - -/** - * Extract the first text output from a ResponseOutputItem array. - * Use this in tests that call agent.invoke() directly. - */ -export function getOutput(items: Awaited>): string { - for (const item of items) { - if (item.type === "message") { - for (const content of (item as any).content) { - if (content.type === "output_text") { - return content.text as string; - } - } - } - } - return ""; -}