diff --git a/.gitignore b/.gitignore index ae5f25e..05e616b 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,9 @@ dist dist-ssr *.local +# Local MCP config (copy src-tauri/mcp.example.json → src-tauri/mcp.json) +src-tauri/mcp.json + # Editor directories and files .vscode/* !.vscode/extensions.json diff --git a/bun.lock b/bun.lock index 53bf064..c0f50af 100644 --- a/bun.lock +++ b/bun.lock @@ -5,9 +5,11 @@ "": { "name": "pengine", "dependencies": { + "@radix-ui/react-accordion": "^1.2.12", "@radix-ui/react-menubar": "^1.1.16", "@tailwindcss/vite": "^4.2.2", "@tauri-apps/api": "^2", + "@tauri-apps/plugin-dialog": "^2", "@tauri-apps/plugin-opener": "^2", "qrcode.react": "^4.2.0", "react": "^19.1.0", @@ -172,8 +174,12 @@ "@radix-ui/primitive": ["@radix-ui/primitive@1.1.3", "", {}, "sha512-JTF99U/6XIjCBo0wqkU5sK10glYe27MRRsfwoiq5zzOEZLHU3A3KCMa5X/azekYRCJ0HlwI0crAXS/5dEHTzDg=="], + "@radix-ui/react-accordion": ["@radix-ui/react-accordion@1.2.12", "", { "dependencies": { "@radix-ui/primitive": "1.1.3", "@radix-ui/react-collapsible": "1.1.12", "@radix-ui/react-collection": "1.1.7", "@radix-ui/react-compose-refs": "1.1.2", "@radix-ui/react-context": "1.1.2", "@radix-ui/react-direction": "1.1.1", "@radix-ui/react-id": "1.1.1", "@radix-ui/react-primitive": "2.1.3", "@radix-ui/react-use-controllable-state": "1.2.2" }, "peerDependencies": { "@types/react": "*", "@types/react-dom": "*", "react": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc", "react-dom": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc" }, "optionalPeers": ["@types/react", "@types/react-dom"] }, "sha512-T4nygeh9YE9dLRPhAHSeOZi7HBXo+0kYIPJXayZfvWOWA0+n3dESrZbjfDPUABkUNym6Hd+f2IR113To8D2GPA=="], + "@radix-ui/react-arrow": ["@radix-ui/react-arrow@1.1.7", "", { "dependencies": { "@radix-ui/react-primitive": "2.1.3" }, "peerDependencies": { "@types/react": "*", "@types/react-dom": "*", "react": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc", "react-dom": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc" }, "optionalPeers": ["@types/react", "@types/react-dom"] }, "sha512-F+M1tLhO+mlQaOWspE8Wstg+z6PwxwRd8oQ8IXceWz92kfAmalTRf0EjrouQeo7QssEPfCn05B4Ihs1K9WQ/7w=="], + "@radix-ui/react-collapsible": ["@radix-ui/react-collapsible@1.1.12", "", { "dependencies": { "@radix-ui/primitive": "1.1.3", "@radix-ui/react-compose-refs": "1.1.2", "@radix-ui/react-context": "1.1.2", "@radix-ui/react-id": "1.1.1", "@radix-ui/react-presence": "1.1.5", "@radix-ui/react-primitive": "2.1.3", "@radix-ui/react-use-controllable-state": "1.2.2", "@radix-ui/react-use-layout-effect": "1.1.1" }, "peerDependencies": { "@types/react": "*", "@types/react-dom": "*", "react": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc", "react-dom": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc" }, "optionalPeers": ["@types/react", "@types/react-dom"] }, "sha512-Uu+mSh4agx2ib1uIGPP4/CKNULyajb3p92LsVXmH2EHVMTfZWpll88XJ0j4W0z3f8NK1eYl1+Mf/szHPmcHzyA=="], + "@radix-ui/react-collection": ["@radix-ui/react-collection@1.1.7", "", { "dependencies": { "@radix-ui/react-compose-refs": "1.1.2", "@radix-ui/react-context": "1.1.2", "@radix-ui/react-primitive": "2.1.3", "@radix-ui/react-slot": "1.2.3" }, "peerDependencies": { "@types/react": "*", "@types/react-dom": "*", "react": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc", "react-dom": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc" }, "optionalPeers": ["@types/react", "@types/react-dom"] }, "sha512-Fh9rGN0MoI4ZFUNyfFVNU4y9LUz93u9/0K+yLgA2bwRojxM8JU1DyvvMBabnZPBgMWREAJvU2jjVzq+LrFUglw=="], "@radix-ui/react-compose-refs": ["@radix-ui/react-compose-refs@1.1.2", "", { "peerDependencies": { "@types/react": "*", "react": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc" }, "optionalPeers": ["@types/react"] }, "sha512-z4eqJvfiNnFMHIIvXP3CY57y2WJs5g2v3X0zm9mEJkrkNv4rDxu+sg9Jh8EkXyeqBkB7SOcboo9dMVqhyrACIg=="], @@ -330,6 +336,8 @@ "@tauri-apps/cli-win32-x64-msvc": ["@tauri-apps/cli-win32-x64-msvc@2.10.1", "", { "os": "win32", "cpu": "x64" }, "sha512-6Cn7YpPFwzChy0ERz6djKEmUehWrYlM+xTaNzGPgZocw3BD7OfwfWHKVWxXzdjEW2KfKkHddfdxK1XXTYqBRLg=="], + "@tauri-apps/plugin-dialog": ["@tauri-apps/plugin-dialog@2.7.0", "", { "dependencies": { "@tauri-apps/api": "^2.10.1" } }, "sha512-4nS/hfGMGCXiAS3LtVjH9AgsSAPJeG/7R+q8agTFqytjnMa4Zq95Bq8WzVDkckpanX+yyRHXnRtrKXkANKDHvw=="], + "@tauri-apps/plugin-opener": ["@tauri-apps/plugin-opener@2.5.3", "", { "dependencies": { "@tauri-apps/api": "^2.8.0" } }, "sha512-CCcUltXMOfUEArbf3db3kCE7Ggy1ExBEBl51Ko2ODJ6GDYHRp1nSNlQm5uNCFY5k7/ufaK5Ib3Du/Zir19IYQQ=="], "@types/babel__core": ["@types/babel__core@7.20.5", "", { "dependencies": { "@babel/parser": "^7.20.7", "@babel/types": "^7.20.7", "@types/babel__generator": "*", "@types/babel__template": "*", "@types/babel__traverse": "*" } }, "sha512-qoQprZvz5wQFJwMDqeseRXWv3rqMvhgpbXFfVyWhbx9X47POIA6i/+dXefEmZKoAgOaTdaIgNSMqMIU61yRyzA=="], diff --git a/doc/design/README.md b/doc/design/README.md index f647753..8be8082 100644 --- a/doc/design/README.md +++ b/doc/design/README.md @@ -183,6 +183,12 @@ Telegram ──► teloxide dispatcher ──► text_handler --- +## Modules + +- [MCP — Model Context Protocol](./mcp.md) — agent tool-use via external MCP servers (POC). + +--- + ## Adding a New Module ### Frontend diff --git a/doc/design/mcp.md b/doc/design/mcp.md new file mode 100644 index 0000000..fd14bd4 --- /dev/null +++ b/doc/design/mcp.md @@ -0,0 +1,112 @@ +# MCP — Model Context Protocol Module (POC) + +> Status: **proof of concept**. One server, one transport, one happy path. + +## What & Why + +[MCP](https://modelcontextprotocol.org/) is an open JSON-RPC 2.0 protocol that lets an LLM "host" discover and call tools exposed by external "servers". Pengine adopts MCP so we can grow the agent's capabilities by dropping in new servers instead of writing bespoke Rust glue for each tool. Every tool call flows through one well-defined choke point, which is what makes it auditable. + +## Roles in Pengine + +| Role | Where | Responsibility | +|---|---|---| +| **Host** | Pengine (Tauri binary) | Owns the LLM (Ollama) connection, the Telegram bot, and the agent loop. | +| **Client** | `src-tauri/src/modules/mcp/` | One `McpClient` per connected server. Speaks JSON-RPC over stdio. | +| **Server** | External child process | Anything that speaks MCP — `npx @modelcontextprotocol/server-filesystem`, a Docker container, a custom binary. | + +```text +Telegram message + │ + ▼ +bot::service::text_handler + │ + ▼ +bot::agent::run_turn ────► ollama::chat_with_tools (Ollama /api/chat) + ▲ │ + │ │ tool_calls? + │ ▼ + └─────────── mcp::registry::call_tool ──► McpClient ──► child process (stdio) +``` + +## Module Layout + +```text +src-tauri/src/modules/mcp/ +├── mod.rs +├── protocol.rs JSON-RPC 2.0 request/response types +├── types.rs McpConfig, ServerConfig, Tool +├── transport.rs StdioTransport — child process + line-delimited JSON +├── client.rs McpClient — initialize / tools/list / tools/call +├── registry.rs McpRegistry — fan-out across all connected servers +└── service.rs load_or_init_config(), connect_all() +``` + +The registry lives on `AppState.mcp` (`Arc>`) so the bot agent and any future HTTP route can reach it. + +## Config + +File: `$APP_DATA/mcp.json` (next to `connection.json`). Created on first launch with a sane default if missing. + +```json +{ + "servers": { + "filesystem": { + "command": "npx", + "args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"], + "env": {} + } + } +} +``` + +To add a server: add another entry under `servers`. Restart Pengine. + +## Protocol Subset Implemented + +Four messages, that's it: + +1. `initialize` — handshake. We send `{protocolVersion, capabilities, clientInfo}` and ignore most of the response. +2. `notifications/initialized` — required notification after init. +3. `tools/list` — discovery, cached on the client. +4. `tools/call` — `{name, arguments}` → `{content: [{type: "text", text}]}`. + +Out of scope for the POC: resources, prompts, sampling, server-initiated requests, batch JSON-RPC, HTTP transport. + +## Ollama Bridge + +MCP `inputSchema` is JSON Schema, and so are Ollama's tool `parameters` — translation is just a rename. See `to_ollama_tools` in `bot/agent.rs`. Tool names are emitted as `server.tool` so the registry can route a call back to the right client. + +The agent loop in `bot::agent::run_turn`: + +1. Snapshot the available tools from the registry. +2. Send `system + user` plus the tool list to Ollama. +3. If the response carries `tool_calls`, run each via `registry.call_tool`, append the results as `role: "tool"` messages, loop. Capped at **5 steps**. +4. Otherwise return the assistant content as the final reply. + +Use a tool-capable model (e.g. `qwen3:8b`). Check with `ollama show ` for the `tools` capability. + +## Audit Logs + +Every MCP-relevant event is emitted as a `LogEntry` with `kind = "mcp"` via `state.emit_log`. They flow through the existing SSE log stream (`GET /v1/logs`) and are visible on the dashboard: + +- `loading MCP config…` +- `filesystem ready (2 tools)` +- `MCP ready (2 tools)` +- `tools available: filesystem.read_file, filesystem.list_directory` +- `tool call (0): filesystem.list_directory({"path":"/tmp"})` +- `tool result (842 bytes)` +- `tool error: …` + +That single audit trail is the "auditable protocol" promise of this feature. + +## Try It + +1. `npx -y @modelcontextprotocol/server-filesystem /tmp` should run (Node + npm available). +2. `ollama pull qwen3:8b` (or any tool-capable model). +3. `bun run tauri dev`. On first launch, watch the dashboard for `mcp` lines confirming the filesystem server connected. +4. Connect a Telegram bot, then send: *"List the files in /tmp."* +5. Expect a `tool call` and `tool result` line in the log, followed by a coherent reply on Telegram. + +## Future Work + +Permission prompts, multiple servers in the default config, a frontend tools panel, hot reload of `mcp.json`, HTTP/SSE transport, resources & prompts. Not in this PR. diff --git a/e2e/setup-dashboard.spec.ts b/e2e/setup-dashboard.spec.ts index f50f0c4..0427562 100644 --- a/e2e/setup-dashboard.spec.ts +++ b/e2e/setup-dashboard.spec.ts @@ -82,16 +82,83 @@ async function mockApis(page: import("@playwright/test").Page) { await route.continue(); } }); + + await page.route(`${PENGINE_API_BASE}/v1/ollama/models`, async (route) => { + await route.fulfill({ + status: 200, + contentType: "application/json", + body: JSON.stringify({ + reachable: true, + active_model: "qwen3-coder:30b", + selected_model: null, + models: ["qwen3-coder:30b"], + }), + }); + }); + + await page.route(`${PENGINE_API_BASE}/v1/ollama/model`, async (route) => { + if (route.request().method() === "PUT") { + let selected_model: string | null = null; + const raw = route.request().postData(); + if (raw) { + try { + const body = JSON.parse(raw) as { model?: string | null }; + let m: string | null = null; + if (typeof body.model === "string") m = body.model.trim(); + selected_model = m && m.length > 0 ? m : null; + } catch { + /* ignore malformed body */ + } + } + await route.fulfill({ + status: 200, + contentType: "application/json", + body: JSON.stringify({ selected_model }), + }); + } else { + await route.continue(); + } + }); + + await page.route( + (url) => url.href.startsWith(`${PENGINE_API_BASE}/v1/mcp/servers`), + async (route) => { + await route.fulfill({ + status: 200, + contentType: "application/json", + body: JSON.stringify({ servers: {} }), + }); + }, + ); + + await page.route( + (url) => url.href.startsWith(`${PENGINE_API_BASE}/v1/mcp/tools`), + async (route) => { + await route.fulfill({ + status: 200, + contentType: "application/json", + body: JSON.stringify([]), + }); + }, + ); } test.describe("setup to dashboard flow", () => { test("shows 'no device' on dashboard when disconnected", async ({ page }) => { + // Force offline so the assertion does not depend on a local Pengine/Ollama install. + await page.route(`${PENGINE_API_BASE}/v1/health`, async (route) => { + await route.abort("failed"); + }); + await page.route(`${PENGINE_API_BASE}/v1/ollama/models`, async (route) => { + await route.abort("failed"); + }); + await page.goto("/dashboard"); await expect(page.getByTestId("app-ready")).toBeVisible(); await expect(page).toHaveURL(/\/dashboard$/); - await expect(page.getByText("No device connected")).toBeVisible(); - await expect(page.getByRole("link", { name: "Go to setup" })).toBeVisible(); + await expect(page.getByText("Some services offline")).toBeVisible({ timeout: 15_000 }); + await expect(page.getByRole("link", { name: "Setup", exact: true })).toBeVisible(); }); test("walks all setup wizard steps and opens dashboard", async ({ page }) => { @@ -128,10 +195,8 @@ test.describe("setup to dashboard flow", () => { await page.getByRole("button", { name: "Open dashboard" }).click(); await expect(page).toHaveURL(/\/dashboard$/); - await expect( - page.getByRole("heading", { name: "Connected device and running services" }), - ).toBeVisible(); - await expect(page.getByText("Telegram gateway")).toBeVisible(); + await expect(page.getByText("All systems running")).toBeVisible({ timeout: 15_000 }); + await expect(page.getByText("@TestPengineBot")).toBeVisible(); }); test("loads dashboard when device is already connected", async ({ page }) => { @@ -144,6 +209,7 @@ test.describe("setup to dashboard flow", () => { await expect(page.getByTestId("app-ready")).toBeVisible(); await expect(page).toHaveURL(/\/dashboard$/); - await expect(page.getByText("1 connected device")).toBeVisible(); + await expect(page.getByText("All systems running")).toBeVisible({ timeout: 15_000 }); + await expect(page.getByRole("button", { name: "Disconnect" })).toBeVisible(); }); }); diff --git a/package.json b/package.json index 04b32ef..57f24d2 100644 --- a/package.json +++ b/package.json @@ -30,9 +30,11 @@ ] }, "dependencies": { + "@radix-ui/react-accordion": "^1.2.12", "@radix-ui/react-menubar": "^1.1.16", "@tailwindcss/vite": "^4.2.2", "@tauri-apps/api": "^2", + "@tauri-apps/plugin-dialog": "^2", "@tauri-apps/plugin-opener": "^2", "qrcode.react": "^4.2.0", "react": "^19.1.0", diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 91b3149..ee2b6d7 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -2744,6 +2744,7 @@ checksum = "e3e0adef53c21f888deb4fa59fc59f7eb17404926ee8a6f59f5df0fd7f9f3272" dependencies = [ "bitflags 2.11.0", "block2", + "libc", "objc2", "objc2-core-foundation", ] @@ -2957,12 +2958,15 @@ dependencies = [ "axum", "chrono", "env_logger", + "fastrand", + "log", "reqwest 0.13.2", "serde", "serde_json", "socket2 0.5.10", "tauri", "tauri-build", + "tauri-plugin-dialog", "tauri-plugin-opener", "teloxide", "tokio", @@ -3763,6 +3767,30 @@ dependencies = [ "web-sys", ] +[[package]] +name = "rfd" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15ad77d9e70a92437d8f74c35d99b4e4691128df018833e99f90bcd36152672" +dependencies = [ + "block2", + "dispatch2", + "glib-sys", + "gobject-sys", + "gtk-sys", + "js-sys", + "log", + "objc2", + "objc2-app-kit", + "objc2-core-foundation", + "objc2-foundation", + "raw-window-handle", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "windows-sys 0.60.2", +] + [[package]] name = "rgb" version = "0.8.53" @@ -4726,6 +4754,48 @@ dependencies = [ "walkdir", ] +[[package]] +name = "tauri-plugin-dialog" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1fa4150c95ae391946cc8b8f905ab14797427caba3a8a2f79628e956da91809" +dependencies = [ + "log", + "raw-window-handle", + "rfd", + "serde", + "serde_json", + "tauri", + "tauri-plugin", + "tauri-plugin-fs", + "thiserror 2.0.18", + "url", +] + +[[package]] +name = "tauri-plugin-fs" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36e1ec28b79f3d0683f4507e1615c36292c0ea6716668770d4396b9b39871ed8" +dependencies = [ + "anyhow", + "dunce", + "glob", + "log", + "objc2-foundation", + "percent-encoding", + "schemars 0.8.22", + "serde", + "serde_json", + "serde_repr", + "tauri", + "tauri-plugin", + "tauri-utils", + "thiserror 2.0.18", + "toml 0.9.12+spec-1.1.0", + "url", +] + [[package]] name = "tauri-plugin-opener" version = "2.5.3" diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 6269015..22f4094 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -20,6 +20,7 @@ tauri-build = { version = "2", features = [] } [dependencies] tauri = { version = "2", features = [] } tauri-plugin-opener = "2" +log = "0.4" serde = { version = "1", features = ["derive"] } serde_json = "1" tokio = { version = "1", features = ["full"] } @@ -31,4 +32,6 @@ env_logger = "0.11" chrono = { version = "0.4", features = ["serde"] } tokio-stream = { version = "0.1", features = ["sync"] } socket2 = "0.5" +fastrand = "2" +tauri-plugin-dialog = "2" diff --git a/src-tauri/capabilities/default.json b/src-tauri/capabilities/default.json index ecdd4ce..fdfa076 100644 --- a/src-tauri/capabilities/default.json +++ b/src-tauri/capabilities/default.json @@ -6,6 +6,7 @@ "permissions": [ "core:default", "opener:default", + "dialog:default", "core:event:default", "core:event:allow-listen", "core:event:allow-emit" diff --git a/src-tauri/mcp.example.json b/src-tauri/mcp.example.json new file mode 100644 index 0000000..52dcec6 --- /dev/null +++ b/src-tauri/mcp.example.json @@ -0,0 +1,18 @@ +{ + "servers": { + "dice": { + "type": "native", + "id": "dice" + }, + "filesystem": { + "type": "stdio", + "command": "npx", + "args": [ + "-y", + "@modelcontextprotocol/server-filesystem", + "/absolute/path/to/allowed/folder" + ], + "env": {} + } + } +} diff --git a/src-tauri/src/app.rs b/src-tauri/src/app.rs index 6f4082c..f089a55 100644 --- a/src-tauri/src/app.rs +++ b/src-tauri/src/app.rs @@ -1,5 +1,6 @@ use crate::infrastructure::http_server; use crate::modules::bot::{commands, repository, service as bot_service}; +use crate::modules::mcp::service as mcp_service; use crate::shared::state::AppState; use std::path::PathBuf; use tauri::Manager; @@ -18,22 +19,42 @@ pub fn run() { tauri::Builder::default() .plugin(tauri_plugin_opener::init()) + .plugin(tauri_plugin_dialog::init()) .setup(|app| { let path = store_path(app); - let shared_state = AppState::new(path); + let (mcp_path, mcp_src) = mcp_service::resolve_mcp_config_path(&path); + let shared_state = AppState::new(path, mcp_path, mcp_src.to_string()); { let handle = app.handle().clone(); let state = shared_state.clone(); - tauri::async_runtime::spawn(async move { - let mut lock = state.app_handle.lock().await; - *lock = Some(handle); + tauri::async_runtime::block_on(async move { + *state.app_handle.lock().await = Some(handle); }); } app.manage(shared_state.clone()); - // Resume persisted connection if present + // Load MCP before any bot work so the first Telegram message never sees an empty registry. + let mcp_path = shared_state.mcp_config_path.clone(); + let mcp_state = shared_state.clone(); + tauri::async_runtime::block_on(async move { + mcp_state + .emit_log("mcp", &format!("loading {}", mcp_path.display())) + .await; + match mcp_service::load_or_init_config(&mcp_path) { + Ok(cfg) => { + mcp_service::rebuild_registry_into_state(&mcp_state, &cfg).await; + } + Err(e) => { + mcp_state + .emit_log("mcp", &format!("mcp.json error: {e}")) + .await; + } + } + }); + + // Resume persisted Telegram connection if present. let resume_state = shared_state.clone(); tauri::async_runtime::spawn(async move { let Some(conn) = repository::load(&resume_state.store_path) else { @@ -43,15 +64,12 @@ pub fn run() { .emit_log("ok", &format!("Resuming bot @{}…", conn.bot_username)) .await; let token = conn.bot_token.clone(); - { - let mut lock = resume_state.connection.lock().await; - *lock = Some(conn); - } + *resume_state.connection.lock().await = Some(conn); let shutdown = resume_state.shutdown_notify.clone(); bot_service::start_bot(resume_state, token, shutdown).await; }); - // Start localhost HTTP API + // Start localhost HTTP API. let server_state = shared_state.clone(); tauri::async_runtime::spawn(async move { http_server::start_server(server_state).await; @@ -62,6 +80,7 @@ pub fn run() { .invoke_handler(tauri::generate_handler![ commands::get_connection_status, commands::disconnect_bot, + commands::pick_mcp_filesystem_folder, ]) .run(tauri::generate_context!()) .expect("error while running tauri application"); diff --git a/src-tauri/src/infrastructure/http_server.rs b/src-tauri/src/infrastructure/http_server.rs index 37debab..268924b 100644 --- a/src-tauri/src/infrastructure/http_server.rs +++ b/src-tauri/src/infrastructure/http_server.rs @@ -1,10 +1,12 @@ use crate::infrastructure::bot_lifecycle; use crate::modules::bot::{repository, service as bot_service}; +use crate::modules::mcp::service as mcp_service; +use crate::modules::ollama::service as ollama_service; use crate::shared::state::{AppState, ConnectionData}; -use axum::extract::State; +use axum::extract::{Path, State}; use axum::http::StatusCode; use axum::response::{Json, Sse}; -use axum::routing::{delete, get, post}; +use axum::routing::{delete, get, post, put}; use axum::Router; use chrono::Utc; use serde::{Deserialize, Serialize}; @@ -41,6 +43,39 @@ pub struct ErrorResponse { pub error: String, } +#[derive(Serialize)] +pub struct McpToolDto { + pub server: String, + pub name: String, + pub description: Option, +} + +#[derive(Serialize)] +pub struct McpConfigInfoResponse { + pub config_path: String, + /// `"project"` or `"app_data"` + pub source: String, + pub filesystem_allowed_paths: Vec, +} + +#[derive(Deserialize)] +pub struct PutMcpFilesystemBody { + pub paths: Vec, +} + +#[derive(Serialize)] +pub struct OllamaModelsResponse { + pub reachable: bool, + pub active_model: Option, + pub selected_model: Option, + pub models: Vec, +} + +#[derive(Deserialize)] +pub struct PutOllamaModelBody { + pub model: Option, +} + pub async fn start_server(state: AppState) { let cors = CorsLayer::new() .allow_origin(Any) @@ -52,6 +87,14 @@ pub async fn start_server(state: AppState) { .route("/v1/connect", delete(handle_disconnect)) .route("/v1/health", get(handle_health)) .route("/v1/logs", get(handle_logs_sse)) + .route("/v1/ollama/models", get(handle_ollama_models)) + .route("/v1/ollama/model", put(handle_ollama_model_put)) + .route("/v1/mcp/tools", get(handle_mcp_tools)) + .route("/v1/mcp/config", get(handle_mcp_config_get)) + .route("/v1/mcp/filesystem", put(handle_mcp_filesystem_put)) + .route("/v1/mcp/servers", get(handle_mcp_servers_list)) + .route("/v1/mcp/servers/{name}", put(handle_mcp_server_upsert)) + .route("/v1/mcp/servers/{name}", delete(handle_mcp_server_delete)) .layer(cors) .with_state(state.clone()); @@ -65,8 +108,6 @@ pub async fn start_server(state: AppState) { axum::serve(listener, app).await.expect("axum serve failed"); } -/// Bind with `SO_REUSEADDR` so a quick restart can reclaim the port after the old socket -/// enters `TIME_WAIT`. Falls back to the same error as plain bind if another process still listens. fn bind_loopback_reuse(addr: std::net::SocketAddr) -> std::io::Result { let socket = Socket::new(Domain::for_address(addr), Type::STREAM, None)?; socket.set_nonblocking(true)?; @@ -210,6 +251,282 @@ async fn handle_health(State(state): State) -> Json { }) } +async fn handle_ollama_models(State(state): State) -> Json { + let selected_model = state.preferred_ollama_model.read().await.clone(); + match ollama_service::model_catalog(3000).await { + Ok(catalog) => Json(OllamaModelsResponse { + reachable: true, + active_model: catalog.active, + selected_model, + models: catalog.models, + }), + Err(_) => Json(OllamaModelsResponse { + reachable: false, + active_model: None, + selected_model, + models: Vec::new(), + }), + } +} + +async fn handle_ollama_model_put( + State(state): State, + Json(body): Json, +) -> Result<(StatusCode, Json), (StatusCode, Json)> { + let normalized = body + .model + .as_ref() + .map(|m| m.trim().to_string()) + .filter(|m| !m.is_empty()); + + if let Some(ref model) = normalized { + let catalog = ollama_service::model_catalog(3000) + .await + .map_err(|e| (StatusCode::BAD_GATEWAY, Json(ErrorResponse { error: e })))?; + if !catalog.models.iter().any(|m| m == model) { + return Err(( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + error: format!("model '{model}' is not available in Ollama"), + }), + )); + } + } + + { + let mut lock = state.preferred_ollama_model.write().await; + *lock = normalized.clone(); + } + + state + .emit_log( + "run", + &format!( + "ollama model {}", + normalized + .as_ref() + .map(|m| format!("set to '{m}'")) + .unwrap_or_else(|| "reset to active".to_string()) + ), + ) + .await; + + Ok(( + StatusCode::OK, + Json(serde_json::json!({ "selected_model": normalized })), + )) +} + +async fn handle_mcp_config_get(State(state): State) -> Json { + let filesystem_allowed_paths = state + .mcp_config_path + .exists() + .then(|| mcp_service::read_config(&state.mcp_config_path).ok()) + .flatten() + .map(|c| mcp_service::filesystem_allowed_paths(&c)) + .unwrap_or_default(); + + Json(McpConfigInfoResponse { + config_path: state.mcp_config_path.to_string_lossy().into_owned(), + source: state.mcp_config_source.clone(), + filesystem_allowed_paths, + }) +} + +async fn handle_mcp_filesystem_put( + State(state): State, + Json(body): Json, +) -> Result<(StatusCode, Json), (StatusCode, Json)> { + let paths: Vec = body + .paths + .iter() + .map(|p| p.trim().to_string()) + .filter(|p| !p.is_empty()) + .collect(); + if paths.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + error: "at least one path is required".into(), + }), + )); + } + + let _guard = state.mcp_config_mutex.lock().await; + + let mut cfg = if state.mcp_config_path.exists() { + mcp_service::read_config(&state.mcp_config_path) + .map_err(|e| (StatusCode::BAD_REQUEST, Json(ErrorResponse { error: e })))? + } else { + mcp_service::load_or_init_config(&state.mcp_config_path).map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { error: e }), + ) + })? + }; + + mcp_service::set_filesystem_allowed_paths(&mut cfg, &paths); + mcp_service::save_config(&state.mcp_config_path, &cfg).map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { error: e }), + ) + })?; + + state + .emit_log( + "mcp", + &format!( + "filesystem allowed paths ({}) updated → {}", + paths.len(), + state.mcp_config_path.display() + ), + ) + .await; + + mcp_service::rebuild_registry_into_state(&state, &cfg).await; + + Ok((StatusCode::OK, Json(serde_json::json!({ "ok": true })))) +} + +async fn handle_mcp_tools(State(state): State) -> Json> { + Json( + state + .mcp + .read() + .await + .all_tools() + .into_iter() + .map(|t| McpToolDto { + server: t.server_name, + name: t.name, + description: t.description, + }) + .collect(), + ) +} + +// ── MCP server CRUD ────────────────────────────────────────────────── + +#[derive(Serialize)] +struct McpServersResponse { + servers: std::collections::BTreeMap, +} + +async fn handle_mcp_servers_list( + State(state): State, +) -> Result, (StatusCode, Json)> { + let cfg = { + let _guard = state.mcp_config_mutex.lock().await; + mcp_service::load_or_init_config(&state.mcp_config_path).map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { error: e }), + ) + })? + }; + Ok(Json(McpServersResponse { + servers: cfg.servers, + })) +} + +fn is_valid_server_name(name: &str) -> bool { + !name.is_empty() + && name.len() <= 64 + && name + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') +} + +async fn handle_mcp_server_upsert( + State(state): State, + Path(name): Path, + Json(entry): Json, +) -> Result<(StatusCode, Json), (StatusCode, Json)> { + if !is_valid_server_name(&name) { + return Err(( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + error: "server name must be alphanumeric, hyphens, or underscores (max 64 chars)" + .into(), + }), + )); + } + + if let crate::modules::mcp::types::ServerEntry::Stdio { ref command, .. } = entry { + if command.trim().is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + error: "command must not be empty".into(), + }), + )); + } + } + + let _guard = state.mcp_config_mutex.lock().await; + let mut cfg = mcp_service::load_or_init_config(&state.mcp_config_path).map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { error: e }), + ) + })?; + + cfg.servers.insert(name.clone(), entry); + + mcp_service::save_config(&state.mcp_config_path, &cfg).map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { error: e }), + ) + })?; + + state + .emit_log("mcp", &format!("server '{name}' saved")) + .await; + mcp_service::rebuild_registry_into_state(&state, &cfg).await; + + Ok((StatusCode::OK, Json(serde_json::json!({ "ok": true })))) +} + +async fn handle_mcp_server_delete( + State(state): State, + Path(name): Path, +) -> Result<(StatusCode, Json), (StatusCode, Json)> { + let _guard = state.mcp_config_mutex.lock().await; + + let mut cfg = mcp_service::load_or_init_config(&state.mcp_config_path).map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { error: e }), + ) + })?; + + if cfg.servers.remove(&name).is_none() { + return Err(( + StatusCode::NOT_FOUND, + Json(ErrorResponse { + error: format!("server '{name}' not found"), + }), + )); + } + + mcp_service::save_config(&state.mcp_config_path, &cfg).map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { error: e }), + ) + })?; + + state + .emit_log("mcp", &format!("server '{name}' removed")) + .await; + mcp_service::rebuild_registry_into_state(&state, &cfg).await; + + Ok((StatusCode::OK, Json(serde_json::json!({ "ok": true })))) +} + async fn handle_logs_sse( State(state): State, ) -> Sse>> { diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 49488d5..a56f05f 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -1,6 +1,6 @@ mod app; mod infrastructure; -mod modules; +pub mod modules; mod shared; pub fn run() { diff --git a/src-tauri/src/modules/bot/agent.rs b/src-tauri/src/modules/bot/agent.rs new file mode 100644 index 0000000..c855e7c --- /dev/null +++ b/src-tauri/src/modules/bot/agent.rs @@ -0,0 +1,293 @@ +use crate::modules::ollama::service as ollama; +use crate::shared::state::AppState; +use serde_json::json; +use std::time::{Duration, Instant}; + +const MAX_STEPS: usize = 3; + +/// Ollama sometimes returns `function.arguments` as a JSON string; normalize to an object. +fn tool_call_arguments(call: &serde_json::Value) -> serde_json::Value { + let raw = call.get("function").and_then(|f| f.get("arguments")); + match raw { + None => json!({}), + Some(serde_json::Value::String(s)) => { + serde_json::from_str::(s).unwrap_or_else(|_| json!({})) + } + Some(v) => v.clone(), + } +} + +fn fmt_duration(d: Duration) -> String { + let ms = d.as_millis(); + if ms < 1000 { + format!("{ms}ms") + } else { + format!("{:.1}s", d.as_secs_f64()) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ReplySource { + Model, + Tool, +} + +pub struct TurnResult { + pub text: String, + pub source: ReplySource, +} + +pub async fn run_turn(state: &AppState, user_message: &str) -> Result { + let model = if let Some(selected) = state.preferred_ollama_model.read().await.clone() { + selected + } else { + ollama::active_model().await? + }; + + let (ollama_tools, has_tools) = { + let reg = state.mcp.read().await; + (reg.ollama_tools(), !reg.is_empty()) + }; + + let fs_context = { + let paths = state.cached_filesystem_paths.read().await.clone(); + if paths.is_empty() { + String::new() + } else { + let listing = paths.join(", "); + format!( + "\nFile tools operate on these directories: {listing}\n\ + Always use absolute paths rooted in one of those directories." + ) + } + }; + + let system = if has_tools { + format!( + "You are a helpful assistant with tool access.\n\ + Rules:\n\ + - Call a tool ONLY when you need external data you don't already have.\n\ + - After receiving tool results, answer the user's question immediately in the same response.\n\ + - Be concise and direct.{fs_context}" + ) + } else { + "Answer concisely.".to_string() + }; + + let mut messages = json!([ + { "role": "system", "content": system }, + { "role": "user", "content": user_message } + ]); + + let mut tool_results: Vec<(String, String)> = Vec::new(); + let mut tools_supported = true; + let empty_tools = json!([]); + + // Phase 1: let the model call tools (up to MAX_STEPS rounds). + for step in 0..MAX_STEPS { + let t_model = Instant::now(); + let effective_tools = if tools_supported { + &ollama_tools + } else { + &empty_tools + }; + let result = ollama::chat_with_tools(&model, &messages, effective_tools).await?; + let msg = result.message; + if !result.tools_sent && tools_supported { + tools_supported = false; + state + .emit_log( + "tool", + &format!("{model} does not support tools — answering without them"), + ) + .await; + } + state + .emit_log( + "time", + &format!("model step {step} {}", fmt_duration(t_model.elapsed())), + ) + .await; + + if let Some(arr) = messages.as_array_mut() { + arr.push(msg.clone()); + } + + let tool_calls = msg + .get("tool_calls") + .and_then(|v| v.as_array()) + .cloned() + .unwrap_or_default(); + + if tool_calls.is_empty() { + let text = msg + .get("content") + .and_then(|v| v.as_str()) + .unwrap_or("") + .trim() + .to_string(); + + if !text.is_empty() { + // Model already produced a usable answer (with or without prior tool data). + state + .emit_log( + "tool", + if tool_results.is_empty() { + "model replied in text" + } else { + "answered from tool data" + }, + ) + .await; + return Ok(TurnResult { + text, + source: ReplySource::Model, + }); + } + + // Model returned no text after tools ran — fall through to summarize. + if tool_results.is_empty() { + return Ok(TurnResult { + text: String::new(), + source: ReplySource::Model, + }); + } + break; + } + + state + .emit_log( + "tool", + &format!("model requested {} tool call(s)", tool_calls.len()), + ) + .await; + + let mut direct_replies: Vec = Vec::new(); + + for call in &tool_calls { + let name = call + .get("function") + .and_then(|f| f.get("name")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let args = tool_call_arguments(call); + + state.emit_log("tool", &format!("[{step}] {name}")).await; + + let t_tool = Instant::now(); + let resolved = { + let reg = state.mcp.read().await; + reg.resolve_tool(&name) + }; + let (result_text, is_direct) = match resolved { + Ok((provider, tool_name, direct)) => { + match provider.call_tool(&tool_name, args).await { + Ok(text) => { + state + .emit_log("tool", &format!("result ({} bytes)", text.len())) + .await; + (text, direct) + } + Err(e) => { + state.emit_log("tool", &format!("error: {e}")).await; + (format!("ERROR: {e}"), false) + } + } + } + Err(e) => { + state.emit_log("tool", &format!("error: {e}")).await; + (format!("ERROR: {e}"), false) + } + }; + state + .emit_log( + "time", + &format!("tool {name} {}", fmt_duration(t_tool.elapsed())), + ) + .await; + + tool_results.push((name.clone(), result_text.clone())); + + if is_direct { + direct_replies.push(result_text.clone()); + } + + if let Some(arr) = messages.as_array_mut() { + arr.push(json!({ + "role": "tool", + "name": name, + "content": result_text, + })); + } + } + + if !direct_replies.is_empty() { + return Ok(TurnResult { + text: direct_replies.join("\n\n"), + source: ReplySource::Tool, + }); + } + } + + // Phase 2: tools ran but model didn't produce a good answer yet. + // Make a clean summarization call — no tools, plain Q&A with inlined data. + if !tool_results.is_empty() { + let mut data_block = String::new(); + for (name, content) in &tool_results { + data_block.push_str(&format!("--- {name} result ---\n{content}\n")); + } + + let summary_messages = json!([ + { + "role": "system", + "content": "Answer the user's question using ONLY the data provided below. Be concise and direct." + }, + { + "role": "user", + "content": format!("{user_message}\n\nData:\n{data_block}") + } + ]); + + let empty = json!([]); + let t_summary = Instant::now(); + let summary_result = ollama::chat_with_tools(&model, &summary_messages, &empty).await?; + let summary_msg = summary_result.message; + state + .emit_log( + "time", + &format!("summarize {}", fmt_duration(t_summary.elapsed())), + ) + .await; + + let text = summary_msg + .get("content") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + if !text.trim().is_empty() { + state.emit_log("tool", "answered from tool data").await; + return Ok(TurnResult { + text, + source: ReplySource::Model, + }); + } + + let fallback = tool_results + .last() + .map(|(_, c)| c.clone()) + .expect("tool_results must be non-empty here after guard"); + state + .emit_log("tool", "empty summary, returning raw tool output") + .await; + return Ok(TurnResult { + text: fallback, + source: ReplySource::Tool, + }); + } + + Err(format!( + "agent exceeded {MAX_STEPS} steps without finishing" + )) +} diff --git a/src-tauri/src/modules/bot/commands.rs b/src-tauri/src/modules/bot/commands.rs index ae4017f..d620866 100644 --- a/src-tauri/src/modules/bot/commands.rs +++ b/src-tauri/src/modules/bot/commands.rs @@ -1,6 +1,8 @@ use crate::infrastructure::bot_lifecycle; use crate::modules::bot::repository; use crate::shared::state::AppState; +#[cfg(desktop)] +use tauri_plugin_dialog::DialogExt; #[tauri::command] pub async fn get_connection_status( @@ -26,3 +28,21 @@ pub async fn disconnect_bot(state: tauri::State<'_, AppState>) -> Result Result, String> { + let folder = app + .dialog() + .file() + .set_title("Folder for MCP filesystem tools") + .blocking_pick_folder(); + Ok(folder.map(|p| p.to_string())) +} + +#[cfg(not(desktop))] +#[tauri::command] +pub async fn pick_mcp_filesystem_folder() -> Result, String> { + Err("folder picker is only available on desktop".into()) +} diff --git a/src-tauri/src/modules/bot/mod.rs b/src-tauri/src/modules/bot/mod.rs index 6a6f35e..5001b1c 100644 --- a/src-tauri/src/modules/bot/mod.rs +++ b/src-tauri/src/modules/bot/mod.rs @@ -1,3 +1,4 @@ +pub mod agent; pub mod commands; pub mod repository; pub mod service; diff --git a/src-tauri/src/modules/bot/service.rs b/src-tauri/src/modules/bot/service.rs index 3c1ed64..517fbd2 100644 --- a/src-tauri/src/modules/bot/service.rs +++ b/src-tauri/src/modules/bot/service.rs @@ -1,8 +1,8 @@ -use crate::modules::ollama::service as ollama; +use crate::modules::bot::agent; use crate::shared::state::AppState; use std::sync::Arc; use teloxide::prelude::*; -use teloxide::types::Me; +use teloxide::types::{ChatAction, Me}; use teloxide::utils::command::BotCommands; use tokio::sync::Notify; @@ -98,31 +98,44 @@ async fn command_handler( } async fn text_handler(bot: Bot, msg: Message, state: AppState) -> ResponseResult<()> { - let incoming = msg.text().unwrap_or(""); + let incoming = msg.text().unwrap_or("").to_string(); state .emit_log("msg", &format!("from {}: {}", user_label(&msg), incoming)) .await; - match ollama::active_model().await { - Ok(model) => { - state - .emit_log("tool", &format!("routing to ollama → {model}")) - .await; - match ollama::chat(&model, incoming).await { - Ok(reply) => { - state.emit_log("reply", &format!("→ {reply}")).await; - bot.send_message(msg.chat.id, &reply).await?; - } - Err(e) => { - state.emit_log("run", &format!("ollama error: {e}")).await; - send_inference_unavailable(&bot, &msg, &state).await; - } + // Telegram's `typing` action lasts ~5 seconds. Refresh it every 4s in a + // background task while the agent runs so the user sees a continuous + // "writing…" indicator no matter how long the tool calls take. + let typing_task = { + let bot = bot.clone(); + let chat_id = msg.chat.id; + tokio::spawn(async move { + loop { + let _ = bot.send_chat_action(chat_id, ChatAction::Typing).await; + tokio::time::sleep(std::time::Duration::from_secs(4)).await; } + }) + }; + + let result = agent::run_turn(&state, &incoming).await; + typing_task.abort(); + + match result { + Ok(turn) => { + let reply = if turn.text.trim().is_empty() { + "(no reply)".to_string() + } else { + turn.text + }; + let tag = match turn.source { + agent::ReplySource::Tool => "tool", + agent::ReplySource::Model => "model", + }; + state.emit_log("reply", &format!("[{tag}] {reply}")).await; + bot.send_message(msg.chat.id, &reply).await?; } Err(e) => { - state - .emit_log("run", &format!("no ollama model available: {e}")) - .await; + state.emit_log("run", &format!("agent error: {e}")).await; send_inference_unavailable(&bot, &msg, &state).await; } } diff --git a/src-tauri/src/modules/mcp/client.rs b/src-tauri/src/modules/mcp/client.rs new file mode 100644 index 0000000..74256b4 --- /dev/null +++ b/src-tauri/src/modules/mcp/client.rs @@ -0,0 +1,95 @@ +use super::transport::StdioTransport; +use super::types::ToolDef; +use serde_json::{json, Value}; +use std::collections::HashMap; + +pub struct McpClient { + pub server_name: String, + transport: StdioTransport, + pub tools: Vec, +} + +impl McpClient { + pub async fn connect( + server_name: String, + command: String, + args: Vec, + env: HashMap, + direct_return: bool, + ) -> Result { + let transport = StdioTransport::spawn(&command, &args, &env).await?; + + let init_params = json!({ + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": { "name": "pengine", "version": "0.1.0" }, + }); + transport.call("initialize", Some(init_params)).await?; + let _ = transport.notify("notifications/initialized", None).await; + + let result = transport.call("tools/list", None).await?; + let mut tools = parse_tools(&server_name, &result); + + if direct_return { + for tool in &mut tools { + tool.direct_return = true; + } + } + + Ok(Self { + server_name, + transport, + tools, + }) + } + + pub async fn call_tool(&self, name: &str, args: Value) -> Result { + let result = self + .transport + .call( + "tools/call", + Some(json!({ "name": name, "arguments": args })), + ) + .await?; + + let mut out = String::new(); + if let Some(items) = result.get("content").and_then(|v| v.as_array()) { + for item in items { + if let Some(t) = item.get("text").and_then(|v| v.as_str()) { + if !out.is_empty() { + out.push('\n'); + } + out.push_str(t); + } + } + } + if out.is_empty() { + out = result.to_string(); + } + Ok(out) + } +} + +fn parse_tools(server_name: &str, result: &Value) -> Vec { + let Some(arr) = result.get("tools").and_then(|v| v.as_array()) else { + return vec![]; + }; + arr.iter() + .filter_map(|t| { + let name = t.get("name")?.as_str()?.to_string(); + Some(ToolDef { + server_name: server_name.to_string(), + name, + description: t + .get("description") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + input_schema: t + .get("inputSchema") + .cloned() + .unwrap_or_else(|| json!({"type": "object"})), + direct_return: false, + }) + }) + .collect() +} diff --git a/src-tauri/src/modules/mcp/mod.rs b/src-tauri/src/modules/mcp/mod.rs new file mode 100644 index 0000000..67116d9 --- /dev/null +++ b/src-tauri/src/modules/mcp/mod.rs @@ -0,0 +1,7 @@ +pub mod client; +pub mod native; +pub mod protocol; +pub mod registry; +pub mod service; +pub mod transport; +pub mod types; diff --git a/src-tauri/src/modules/mcp/native.rs b/src-tauri/src/modules/mcp/native.rs new file mode 100644 index 0000000..89221ad --- /dev/null +++ b/src-tauri/src/modules/mcp/native.rs @@ -0,0 +1,67 @@ +use super::types::ToolDef; +use serde_json::{json, Value}; + +const MAX_SIDES: u64 = 1_000_000; + +pub struct NativeProvider { + pub server_name: String, + pub tools: Vec, + handler: fn(&str, &Value) -> Result, +} + +impl NativeProvider { + pub fn call(&self, tool_name: &str, args: &Value) -> Result { + if !self.tools.iter().any(|t| t.name == tool_name) { + return Err(format!("unknown native tool: {tool_name}")); + } + (self.handler)(tool_name, args) + } +} + +/// Built-in dice tools under the given server key (must match `mcp.json` server name). +pub fn dice_named(server_key: &str) -> NativeProvider { + NativeProvider { + server_name: server_key.to_string(), + tools: vec![ToolDef { + server_name: server_key.to_string(), + name: "roll_dice".to_string(), + description: Some( + "Roll a die with the given number of sides and return the result.".to_string(), + ), + input_schema: json!({ + "type": "object", + "properties": { + "sides": { + "type": "integer", + "description": "Number of sides (default 6, max 1 000 000)" + } + } + }), + direct_return: true, + }], + handler: handle_dice, + } +} + +pub fn dice() -> NativeProvider { + dice_named("dice") +} + +fn handle_dice(_tool_name: &str, args: &Value) -> Result { + let sides = args + .get("sides") + .and_then(|v| v.as_u64()) + .unwrap_or(6) + .clamp(2, MAX_SIDES); + + let result = fastrand::u64(1..=sides); + Ok(format!("Rolled a d{sides}: {result}")) +} + +/// Resolve `id` from `mcp.json` (`type: native`) into a provider under `server_key`. +pub fn native_for(server_key: &str, id: &str) -> Result { + match id { + "dice" => Ok(dice_named(server_key)), + _ => Err(format!("unknown native id: {id}")), + } +} diff --git a/src-tauri/src/modules/mcp/protocol.rs b/src-tauri/src/modules/mcp/protocol.rs new file mode 100644 index 0000000..4f78322 --- /dev/null +++ b/src-tauri/src/modules/mcp/protocol.rs @@ -0,0 +1,43 @@ +//! Minimal JSON-RPC 2.0 for MCP over stdio. + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Serialize)] +pub struct JsonRpcRequest<'a> { + pub jsonrpc: &'static str, + pub id: u64, + pub method: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + pub params: Option, +} + +impl<'a> JsonRpcRequest<'a> { + pub fn new(id: u64, method: &'a str, params: Option) -> Self { + Self { + jsonrpc: "2.0", + id, + method, + params, + } + } +} + +#[derive(Debug, Deserialize)] +pub struct JsonRpcResponse { + #[allow(dead_code)] + pub jsonrpc: Option, + #[allow(dead_code)] + pub id: Option, + #[serde(default)] + pub result: Option, + #[serde(default)] + pub error: Option, +} + +#[derive(Debug, Deserialize)] +pub struct JsonRpcError { + #[allow(dead_code)] + pub code: i64, + pub message: String, +} diff --git a/src-tauri/src/modules/mcp/registry.rs b/src-tauri/src/modules/mcp/registry.rs new file mode 100644 index 0000000..19c4ff4 --- /dev/null +++ b/src-tauri/src/modules/mcp/registry.rs @@ -0,0 +1,173 @@ +use super::client::McpClient; +use super::native::NativeProvider; +use super::types::ToolDef; +use serde_json::{json, Value}; +use std::sync::Arc; + +#[derive(Clone)] +pub enum Provider { + Native(Arc), + Mcp(Arc), +} + +impl Provider { + pub fn server_name(&self) -> &str { + match self { + Provider::Native(n) => &n.server_name, + Provider::Mcp(c) => &c.server_name, + } + } + + pub fn tools(&self) -> &[ToolDef] { + match self { + Provider::Native(n) => &n.tools, + Provider::Mcp(c) => &c.tools, + } + } + + pub async fn call_tool(&self, name: &str, args: Value) -> Result { + match self { + Provider::Native(n) => n.call(name, &args), + Provider::Mcp(c) => c.call_tool(name, args).await, + } + } +} + +pub struct ToolRegistry { + providers: Vec, + cached_ollama_tools: Value, + cached_tool_names: Vec, +} + +impl Default for ToolRegistry { + fn default() -> Self { + Self { + providers: Vec::new(), + cached_ollama_tools: Value::Array(Vec::new()), + cached_tool_names: Vec::new(), + } + } +} + +impl ToolRegistry { + pub fn new(providers: Vec) -> Self { + let cached_ollama_tools = build_ollama_tools(&providers); + let cached_tool_names = providers + .iter() + .flat_map(|p| p.tools().iter()) + .filter(|t| should_expose_to_model(t)) + .map(|t| t.name.clone()) + .collect(); + Self { + providers, + cached_ollama_tools, + cached_tool_names, + } + } + + pub fn all_tools(&self) -> Vec { + self.providers + .iter() + .flat_map(|p| p.tools().iter()) + .filter(|t| should_expose_to_model(t)) + .cloned() + .collect() + } + + pub fn ollama_tools(&self) -> Value { + self.cached_ollama_tools.clone() + } + + pub fn tool_names(&self) -> &[String] { + &self.cached_tool_names + } + + pub fn is_empty(&self) -> bool { + self.cached_tool_names.is_empty() + } + + pub async fn call_tool(&self, name: &str, args: Value) -> Result<(String, bool), String> { + let (provider, tool, direct) = self.resolve_tool(name)?; + let text = provider.call_tool(&tool, args).await?; + Ok((text, direct)) + } + + pub fn resolve_tool(&self, name: &str) -> Result<(Provider, String, bool), String> { + let (server, tool) = match name.split_once('.') { + Some((s, t)) => (Some(s), t), + None => (None, name), + }; + + if server.is_none() { + let mut found: Vec<(&Provider, &ToolDef)> = Vec::new(); + for provider in &self.providers { + if let Some(def) = provider.tools().iter().find(|t| t.name == tool) { + found.push((provider, def)); + } + } + return match found.len() { + 0 => Err(format!("tool not found: {name}")), + 1 => { + let (p, d) = found[0]; + Ok((p.clone(), tool.to_string(), d.direct_return)) + } + _ => { + let servers: Vec<_> = found.iter().map(|(p, _)| p.server_name()).collect(); + Err(format!( + "ambiguous tool `{tool}`: matches servers {}", + servers.join(", ") + )) + } + }; + } + + if let Some(s) = server { + let key = s.trim(); + for provider in &self.providers { + if !provider.server_name().eq_ignore_ascii_case(key) { + continue; + } + if let Some(def) = provider.tools().iter().find(|t| t.name == tool) { + return Ok((provider.clone(), tool.to_string(), def.direct_return)); + } + } + } + Err(format!("tool not found: {name}")) + } +} + +fn should_expose_to_model(tool: &ToolDef) -> bool { + let desc = tool.description.as_deref().unwrap_or(""); + if desc.to_ascii_uppercase().contains("DEPRECATED") { + return false; + } + !REDUNDANT_TOOLS.contains(&tool.name.as_str()) +} + +/// Tools that add noise without value for a small local model. +const REDUNDANT_TOOLS: &[&str] = &[ + "read_media_file", + "read_multiple_files", + "list_directory_with_sizes", + "directory_tree", + "list_allowed_directories", +]; + +fn build_ollama_tools(providers: &[Provider]) -> Value { + let arr: Vec = providers + .iter() + .flat_map(|p| p.tools().iter()) + .filter(|t| should_expose_to_model(t)) + .map(|t| { + json!({ + "type": "function", + "function": { + "name": t.name, + "description": t.description.clone().unwrap_or_default(), + "parameters": t.input_schema, + } + }) + }) + .collect(); + Value::Array(arr) +} diff --git a/src-tauri/src/modules/mcp/service.rs b/src-tauri/src/modules/mcp/service.rs new file mode 100644 index 0000000..a156fe0 --- /dev/null +++ b/src-tauri/src/modules/mcp/service.rs @@ -0,0 +1,175 @@ +//! Load `mcp.json` and build [`ToolRegistry`] — same code path for `native` and `stdio` +//! (Docker is just `command` + `args` on a `stdio` entry). + +use super::client::McpClient; +use super::native; +use super::registry::{Provider, ToolRegistry}; +use super::types::{McpConfig, ServerEntry}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +const FILESYSTEM_SERVER_KEY: &str = "filesystem"; +const FILESYSTEM_PKG: &str = "@modelcontextprotocol/server-filesystem"; + +/// Prefer project `mcp.json` under `src-tauri/` (or crate-root `mcp.json`) by walking up from +/// [`std::env::current_exe`], so resolution does not depend on process CWD. Falls back to +/// `mcp.json` next to `connection.json` in app data. +pub fn resolve_mcp_config_path(store_path: &Path) -> (PathBuf, &'static str) { + if let Ok(exe) = std::env::current_exe() { + let mut dir = exe.parent().map(Path::to_path_buf); + for _ in 0..16 { + let Some(ref d) = dir else { + break; + }; + let from_repo_root = d.join("src-tauri").join("mcp.json"); + if from_repo_root.exists() { + return (from_repo_root, "project"); + } + let in_crate_root = d.join("mcp.json"); + if d.join("Cargo.toml").exists() && in_crate_root.exists() { + return (in_crate_root, "project"); + } + dir = d.parent().map(Path::to_path_buf); + } + } + + let app_path = store_path + .parent() + .map(|p| p.join("mcp.json")) + .unwrap_or_else(|| PathBuf::from("mcp.json")); + (app_path, "app_data") +} + +pub fn read_config(path: &Path) -> Result { + let raw = std::fs::read_to_string(path).map_err(|e| format!("read mcp.json: {e}"))?; + serde_json::from_str(&raw).map_err(|e| { + format!( + "parse mcp.json: {e} — every server entry needs a \"type\" field (\"native\" or \"stdio\")" + ) + }) +} + +pub fn save_config(path: &Path, cfg: &McpConfig) -> Result<(), String> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| format!("create parent dirs for mcp.json: {e}"))?; + } + let pretty = serde_json::to_string_pretty(cfg).map_err(|e| format!("encode mcp.json: {e}"))?; + std::fs::write(path, pretty).map_err(|e| format!("write mcp.json: {e}")) +} + +/// All allowed folders for the official MCP filesystem stdio server (paths after the package arg). +pub fn filesystem_allowed_paths(cfg: &McpConfig) -> Vec { + let Some(ServerEntry::Stdio { args, .. }) = cfg.servers.get(FILESYSTEM_SERVER_KEY) else { + return Vec::new(); + }; + let Some(pkg_idx) = args.iter().position(|a| a.contains("server-filesystem")) else { + return Vec::new(); + }; + args[pkg_idx + 1..].to_vec() +} + +pub fn set_filesystem_allowed_paths(cfg: &mut McpConfig, paths: &[String]) { + let mut args = vec!["-y".into(), FILESYSTEM_PKG.into()]; + args.extend(paths.iter().map(|p| p.trim().to_string())); + let entry = ServerEntry::Stdio { + command: "npx".into(), + args, + env: Default::default(), + direct_return: true, + }; + cfg.servers.insert(FILESYSTEM_SERVER_KEY.into(), entry); +} + +fn default_config_value() -> serde_json::Value { + serde_json::json!({ + "servers": { + "dice": { + "type": "native", + "id": "dice" + } + } + }) +} + +pub fn load_or_init_config(path: &Path) -> Result { + if path.exists() { + return read_config(path); + } + + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| format!("create parent dirs for mcp.json: {e}"))?; + } + let default = default_config_value(); + let pretty = serde_json::to_string_pretty(&default) + .map_err(|e| format!("encode default mcp.json: {e}"))?; + std::fs::write(path, pretty).map_err(|e| format!("write mcp.json: {e}"))?; + serde_json::from_value(default).map_err(|e| e.to_string()) +} + +/// Connect every server in order (stable `BTreeMap` keys). Returns registry + status lines. +pub async fn build_registry(cfg: &McpConfig) -> (ToolRegistry, Vec) { + let mut providers = Vec::new(); + let mut status = Vec::new(); + + for (server_key, entry) in &cfg.servers { + match entry { + ServerEntry::Native { id } => match native::native_for(server_key, id) { + Ok(p) => { + let n = p.tools.len(); + providers.push(Provider::Native(Arc::new(p))); + status.push(format!( + "{server_key} native ({n} tool{})", + if n == 1 { "" } else { "s" } + )); + } + Err(e) => status.push(format!("{server_key} native failed: {e}")), + }, + ServerEntry::Stdio { + command, + args, + env, + direct_return, + } => match McpClient::connect( + server_key.clone(), + command.clone(), + args.clone(), + env.clone(), + *direct_return, + ) + .await + { + Ok(client) => { + let n = client.tools.len(); + let dr = if *direct_return { " direct_return" } else { "" }; + providers.push(Provider::Mcp(Arc::new(client))); + status.push(format!( + "{server_key} stdio ({n} tool{}{dr})", + if n == 1 { "" } else { "s" } + )); + } + Err(e) => status.push(format!("{server_key} stdio failed: {e}")), + }, + } + } + + (ToolRegistry::new(providers), status) +} + +/// Replace in-memory tools after a config change (writes should use [`save_config`] first). +pub async fn rebuild_registry_into_state(state: &crate::shared::state::AppState, cfg: &McpConfig) { + *state.cached_filesystem_paths.write().await = filesystem_allowed_paths(cfg); + let (registry, status) = build_registry(cfg).await; + for line in status { + state.emit_log("mcp", &line).await; + } + let n = registry.tool_names().len(); + *state.mcp.write().await = registry; + state + .emit_log( + "mcp", + &format!("ready ({n} tool{})", if n == 1 { "" } else { "s" }), + ) + .await; +} diff --git a/src-tauri/src/modules/mcp/transport.rs b/src-tauri/src/modules/mcp/transport.rs new file mode 100644 index 0000000..2df1f7b --- /dev/null +++ b/src-tauri/src/modules/mcp/transport.rs @@ -0,0 +1,153 @@ +//! Line-delimited JSON over stdin/stdout of a child process. + +use super::protocol::{JsonRpcRequest, JsonRpcResponse}; +use serde_json::Value; +use std::collections::HashMap; +use std::process::Stdio; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{Child, ChildStdin, Command}; +use tokio::sync::{oneshot, Mutex}; + +type Pending = Arc>>>; + +pub struct StdioTransport { + #[allow(dead_code)] + child: Child, + stdin: Mutex, + next_id: AtomicU64, + pending: Pending, +} + +impl StdioTransport { + pub async fn spawn( + command: &str, + args: &[String], + env: &HashMap, + ) -> Result { + let mut cmd = Command::new(command); + cmd.args(args) + .envs(env) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true); + + let mut child = cmd + .spawn() + .map_err(|e| format!("spawn `{command}` failed: {e}"))?; + + let stdin = child.stdin.take().ok_or("no stdin")?; + let stdout = child.stdout.take().ok_or("no stdout")?; + let stderr = child.stderr.take().ok_or("no stderr")?; + + let pending: Pending = Arc::new(Mutex::new(HashMap::new())); + + { + let pending = pending.clone(); + tokio::spawn(async move { + let mut lines = BufReader::new(stdout).lines(); + while let Ok(Some(line)) = lines.next_line().await { + let line = line.trim(); + if line.is_empty() { + continue; + } + let Ok(resp) = serde_json::from_str::(line) else { + continue; + }; + if let Some(id) = resp.id { + if let Some(tx) = pending.lock().await.remove(&id) { + let _ = tx.send(resp); + } + } + } + }); + } + + tokio::spawn(async move { + let mut lines = BufReader::new(stderr).lines(); + loop { + match lines.next_line().await { + Ok(Some(line)) => { + let line = line.trim(); + if line.is_empty() { + continue; + } + log::debug!("mcp stderr: {line}"); + } + Ok(None) => break, + Err(e) => { + log::debug!("mcp stderr read error: {e}"); + break; + } + } + } + }); + + Ok(Self { + child, + stdin: Mutex::new(stdin), + next_id: AtomicU64::new(1), + pending, + }) + } + + pub async fn call(&self, method: &str, params: Option) -> Result { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + let req = JsonRpcRequest::new(id, method, params); + let mut payload = serde_json::to_vec(&req).map_err(|e| format!("encode request: {e}"))?; + payload.push(b'\n'); + + let (tx, rx) = oneshot::channel(); + self.pending.lock().await.insert(id, tx); + + { + let mut stdin = self.stdin.lock().await; + stdin + .write_all(&payload) + .await + .map_err(|e| format!("write stdin: {e}"))?; + stdin.flush().await.map_err(|e| format!("flush: {e}"))?; + } + + let resp = match tokio::time::timeout(std::time::Duration::from_secs(30), rx).await { + Err(_) => { + self.pending.lock().await.remove(&id); + return Err("mcp call timed out".to_string()); + } + Ok(rx_result) => match rx_result { + Err(_) => { + self.pending.lock().await.remove(&id); + return Err("mcp response channel dropped".to_string()); + } + Ok(resp) => resp, + }, + }; + self.pending.lock().await.remove(&id); + + if let Some(err) = resp.error { + return Err(format!("mcp error: {}", err.message)); + } + Ok(resp.result.unwrap_or(Value::Null)) + } + + pub async fn notify(&self, method: &str, params: Option) -> Result<(), String> { + let mut msg = serde_json::json!({ + "jsonrpc": "2.0", + "method": method, + }); + if let Some(p) = params { + msg["params"] = p; + } + let mut payload = serde_json::to_vec(&msg).map_err(|e| e.to_string())?; + payload.push(b'\n'); + let mut stdin = self.stdin.lock().await; + stdin + .write_all(&payload) + .await + .map_err(|e| format!("write stdin: {e}"))?; + stdin.flush().await.map_err(|e| format!("flush: {e}"))?; + Ok(()) + } +} diff --git a/src-tauri/src/modules/mcp/types.rs b/src-tauri/src/modules/mcp/types.rs new file mode 100644 index 0000000..66a96f8 --- /dev/null +++ b/src-tauri/src/modules/mcp/types.rs @@ -0,0 +1,43 @@ +use serde::{Deserialize, Serialize}; +use std::collections::{BTreeMap, HashMap}; + +/// Root config: `src-tauri/mcp.json` in dev or `mcp.json` next to app data (`connection.json`). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct McpConfig { + #[serde(default)] + pub servers: BTreeMap, +} + +/// One logical MCP server. Same top-level shape for every backend: `type` picks the loader. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ServerEntry { + /// In-process tool pack; `id` selects a built-in (e.g. `dice`). + Native { id: String }, + /// Child process speaking MCP over stdio (`docker run … -i` is just command + args). + Stdio { + command: String, + #[serde(default)] + args: Vec, + #[serde(default)] + env: HashMap, + /// When true, tool results are returned directly to the user without + /// sending them back to the model for summarisation. + #[serde(default)] + direct_return: bool, + }, +} + +/// Definition of a single tool, regardless of where it runs. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ToolDef { + #[serde(skip)] + pub server_name: String, + pub name: String, + #[serde(default)] + pub description: Option, + #[serde(default, rename = "inputSchema")] + pub input_schema: serde_json::Value, + #[serde(skip)] + pub direct_return: bool, +} diff --git a/src-tauri/src/modules/mod.rs b/src-tauri/src/modules/mod.rs index aa58b0c..4a4ea12 100644 --- a/src-tauri/src/modules/mod.rs +++ b/src-tauri/src/modules/mod.rs @@ -1,2 +1,3 @@ pub mod bot; +pub mod mcp; pub mod ollama; diff --git a/src-tauri/src/modules/ollama/service.rs b/src-tauri/src/modules/ollama/service.rs index 6b07848..a619b74 100644 --- a/src-tauri/src/modules/ollama/service.rs +++ b/src-tauri/src/modules/ollama/service.rs @@ -7,58 +7,177 @@ fn http_client() -> &'static reqwest::Client { HTTP.get_or_init(reqwest::Client::new) } -/// Returns the currently loaded model (from `/api/ps`), falling back to the -/// first pulled model (from `/api/tags`) if nothing is loaded yet. -pub async fn active_model() -> Result { +#[derive(Debug, Clone)] +pub struct ModelCatalog { + pub active: Option, + pub models: Vec, +} + +/// Returns active model and the full pulled model list (`/api/tags`). +pub async fn model_catalog(timeout_ms: u64) -> Result { let client = http_client(); - let timeout = std::time::Duration::from_secs(5); - - if let Ok(resp) = client.get(OLLAMA_PS_URL).timeout(timeout).send().await { - if let Ok(body) = resp.json::().await { - if let Some(name) = body["models"] - .as_array() - .and_then(|arr| arr.first()) - .and_then(|m| m["name"].as_str()) - { - return Ok(name.to_string()); + let timeout = std::time::Duration::from_millis(timeout_ms); + + let mut active: Option = None; + match client.get(OLLAMA_PS_URL).timeout(timeout).send().await { + Ok(resp) => { + if !resp.status().is_success() { + log::warn!( + "ollama {}: non-success HTTP {}", + OLLAMA_PS_URL, + resp.status() + ); + } else { + match resp.json::().await { + Ok(body) => { + active = body["models"] + .as_array() + .and_then(|arr| arr.first()) + .and_then(|m| m["name"].as_str()) + .map(|s| s.to_string()); + } + Err(e) => { + log::warn!("ollama {}: JSON decode error: {e}", OLLAMA_PS_URL); + } + } } } + Err(e) => log::warn!("ollama {}: request error: {e}", OLLAMA_PS_URL), } - let resp = client - .get(OLLAMA_TAGS_URL) - .timeout(timeout) - .send() - .await - .map_err(|e| format!("ollama unreachable: {e}"))?; + let mut models: Vec = Vec::new(); + match client.get(OLLAMA_TAGS_URL).timeout(timeout).send().await { + Ok(resp) => { + if !resp.status().is_success() { + log::warn!( + "ollama {}: non-success HTTP {}", + OLLAMA_TAGS_URL, + resp.status() + ); + } else { + match resp.json::().await { + Ok(body) => { + models = body["models"] + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|m| m["name"].as_str().map(|s| s.to_string())) + .collect() + }) + .unwrap_or_default(); + } + Err(e) => { + log::warn!("ollama {}: JSON decode error: {e}", OLLAMA_TAGS_URL); + } + } + } + } + Err(e) => log::warn!("ollama {}: request error: {e}", OLLAMA_TAGS_URL), + } - let body: serde_json::Value = resp.json().await.map_err(|e| e.to_string())?; - body["models"] - .as_array() - .and_then(|arr| arr.first()) - .and_then(|m| m["name"].as_str()) - .map(|s| s.to_string()) + if let Some(ref a) = active { + if !models.iter().any(|m| m == a) { + models.insert(0, a.clone()); + } + } + + if active.is_none() && models.is_empty() { + return Err("ollama unreachable: no active model and no pulled models".to_string()); + } + + Ok(ModelCatalog { active, models }) +} + +/// Returns the currently loaded model (from `/api/ps`), falling back to the +/// first pulled model (from `/api/tags`) if nothing is loaded yet. +pub async fn active_model() -> Result { + let catalog = model_catalog(5000).await?; + if let Some(active) = catalog.active { + return Ok(active); + } + catalog + .models + .first() + .cloned() .ok_or_else(|| "no models pulled in ollama".to_string()) } -pub async fn chat(model: &str, prompt: &str) -> Result { - let payload = serde_json::json!({ +/// Outcome of a single chat call so the caller knows whether tools were included in the request. +pub struct ChatResult { + pub message: serde_json::Value, + /// `true` when this request included a non-empty `tools` payload; `false` for plain chat + /// (including transparent fallback when the model rejects tools). + pub tools_sent: bool, +} + +/// Tool-aware chat for the agent loop. Sends a full message history plus a +/// list of tool definitions and returns the raw assistant message (which may +/// contain `tool_calls`). Caller is responsible for executing tools and +/// looping. +/// +/// If the model rejects tools (HTTP 400 "does not support tools"), the request +/// is transparently retried without tools so older models still work. +pub async fn chat_with_tools( + model: &str, + messages: &serde_json::Value, + tools: &serde_json::Value, +) -> Result { + let has_tools = tools.as_array().is_some_and(|a| !a.is_empty()); + + let mut payload = serde_json::json!({ "model": model, - "messages": [{"role": "user", "content": format!("Think fast and answer extremely short. If you don't know the answer, say you don't know. Question: {prompt}")}], + "messages": messages, "stream": false, }); + if has_tools { + payload["tools"] = tools.clone(); + } + + let (status, body) = post_chat(&payload).await?; + if !status.is_success() { + let err_text = body["error"].as_str().unwrap_or(""); + if has_tools && err_text.contains("does not support tools") { + let plain = serde_json::json!({ + "model": model, + "messages": messages, + "stream": false, + }); + let (st, b) = post_chat(&plain).await?; + if !st.is_success() { + return Err(format!("ollama chat HTTP {st}: {b}")); + } + return Ok(ChatResult { + message: extract_message(&b)?, + tools_sent: false, + }); + } + return Err(format!("ollama chat HTTP {status}: {body}")); + } + + Ok(ChatResult { + message: extract_message(&body)?, + tools_sent: has_tools, + }) +} + +async fn post_chat( + payload: &serde_json::Value, +) -> Result<(reqwest::StatusCode, serde_json::Value), String> { let resp = http_client() .post(OLLAMA_CHAT_URL) - .json(&payload) + .json(payload) .timeout(std::time::Duration::from_secs(120)) .send() .await .map_err(|e| e.to_string())?; - + let status = resp.status(); let body: serde_json::Value = resp.json().await.map_err(|e| e.to_string())?; - body["message"]["content"] - .as_str() - .map(|s| s.to_string()) - .ok_or_else(|| "unexpected ollama response shape".to_string()) + Ok((status, body)) +} + +fn extract_message(body: &serde_json::Value) -> Result { + body.get("message") + .cloned() + .ok_or_else(|| format!("ollama protocol error: missing `message` in response: {body}")) } diff --git a/src-tauri/src/shared/state.rs b/src-tauri/src/shared/state.rs index d4d5f4e..850b605 100644 --- a/src-tauri/src/shared/state.rs +++ b/src-tauri/src/shared/state.rs @@ -1,9 +1,10 @@ +use crate::modules::mcp::registry::ToolRegistry; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::sync::Arc; use tauri::Emitter; -use tokio::sync::{Mutex, Notify}; +use tokio::sync::{Mutex, Notify, RwLock}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ConnectionData { @@ -27,11 +28,20 @@ pub struct AppState { pub bot_running: Arc>, pub log_tx: Arc>>>, pub store_path: PathBuf, + /// Resolved `mcp.json` path (project `src-tauri/mcp.json` when present, else app data dir). + pub mcp_config_path: PathBuf, + /// `"project"` or `"app_data"` — for dashboard copy only. + pub mcp_config_source: String, pub app_handle: Arc>>, + pub mcp: Arc>, + pub mcp_config_mutex: Arc>, + pub preferred_ollama_model: Arc>>, + /// Allowed filesystem paths from `mcp.json` (updated with MCP rebuild); avoids disk read per agent turn. + pub cached_filesystem_paths: Arc>>, } impl AppState { - pub fn new(store_path: PathBuf) -> Self { + pub fn new(store_path: PathBuf, mcp_config_path: PathBuf, mcp_config_source: String) -> Self { let (log_tx, _) = tokio::sync::broadcast::channel(256); Self { connection: Arc::new(Mutex::new(None)), @@ -39,7 +49,13 @@ impl AppState { bot_running: Arc::new(Mutex::new(false)), log_tx: Arc::new(Mutex::new(Some(log_tx))), store_path, + mcp_config_path, + mcp_config_source, app_handle: Arc::new(Mutex::new(None)), + mcp: Arc::new(RwLock::new(ToolRegistry::default())), + mcp_config_mutex: Arc::new(Mutex::new(())), + preferred_ollama_model: Arc::new(RwLock::new(None)), + cached_filesystem_paths: Arc::new(RwLock::new(Vec::new())), } } diff --git a/src-tauri/tests/mcp_tools.rs b/src-tauri/tests/mcp_tools.rs new file mode 100644 index 0000000..9955b90 --- /dev/null +++ b/src-tauri/tests/mcp_tools.rs @@ -0,0 +1,102 @@ +//! Integration tests for MCP tooling. + +use pengine_lib::modules::mcp::{native, service}; +use serde_json::json; +use std::path::PathBuf; + +fn temp_mcp_path(name: &str) -> PathBuf { + let mut p = std::env::temp_dir(); + let nanos = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or(0); + p.push(format!("pengine-mcp-{name}-{nanos}.json")); + p +} + +#[test] +fn dice_returns_valid_result() { + let provider = native::dice(); + assert_eq!(provider.tools.len(), 1); + assert_eq!(provider.tools[0].name, "roll_dice"); + assert!(provider.tools[0].direct_return); + + let out = provider + .call("roll_dice", &json!({"sides": 20})) + .expect("dice call"); + assert!(out.starts_with("Rolled a d20: "), "got: {out}"); + + let num: u64 = out.trim_start_matches("Rolled a d20: ").parse().unwrap(); + assert!((1..=20).contains(&num)); +} + +#[test] +fn dice_clamps_invalid_sides() { + let provider = native::dice(); + + let out = provider + .call("roll_dice", &json!({"sides": 0})) + .expect("sides=0"); + assert!(out.starts_with("Rolled a d2: "), "clamped to 2, got: {out}"); + + let out = provider + .call("roll_dice", &json!({"sides": 9999999})) + .expect("sides=9999999"); + assert!( + out.starts_with("Rolled a d1000000: "), + "clamped to MAX, got: {out}" + ); +} + +#[test] +fn dice_rejects_unknown_tool() { + let provider = native::dice(); + let err = provider.call("unknown", &json!({})).unwrap_err(); + assert!(err.contains("unknown native tool")); +} + +#[tokio::test] +async fn mcp_json_loads_native_dice() { + let path = temp_mcp_path("load"); + let cfg = service::load_or_init_config(&path).expect("load_or_init"); + assert!(cfg.servers.contains_key("dice")); + + let (reg, status) = service::build_registry(&cfg).await; + assert!(status + .iter() + .any(|s| s.contains("dice") && s.contains("native"))); + assert_eq!(reg.tool_names(), &["roll_dice"]); + let _ = std::fs::remove_file(path); +} + +#[tokio::test] +async fn native_dice_callable_through_registry_from_config() { + let path = temp_mcp_path("registry"); + let cfg = service::load_or_init_config(&path).expect("load_or_init"); + let (reg, _) = service::build_registry(&cfg).await; + let (text, direct) = reg + .call_tool("roll_dice", json!({"sides": 6})) + .await + .expect("call roll_dice"); + assert!(text.starts_with("Rolled a d6: "), "got: {text}"); + assert!(direct, "dice should be direct_return"); + let _ = std::fs::remove_file(path); +} + +#[test] +fn native_server_key_rename_in_config() { + let path = temp_mcp_path("rename"); + std::fs::write( + &path, + r#"{"servers":{"mydice":{"type":"native","id":"dice"}}}"#, + ) + .unwrap(); + let cfg = service::load_or_init_config(&path).expect("load"); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let (reg, _) = rt.block_on(service::build_registry(&cfg)); + assert_eq!(reg.all_tools()[0].server_name, "mydice"); + let _ = std::fs::remove_file(path); +} diff --git a/src/modules/bot/components/SetupWizard.tsx b/src/modules/bot/components/SetupWizard.tsx index 394e5bc..934fb17 100644 --- a/src/modules/bot/components/SetupWizard.tsx +++ b/src/modules/bot/components/SetupWizard.tsx @@ -243,8 +243,12 @@ export function SetupWizard({ onStepChange, onCompleteSetup }: SetupWizardProps)

               {`curl -fsSL https://ollama.com/install.sh | sh
-ollama pull llama3.2`}
+ollama pull qwen3:8b`}
             
+

+ Recommended: qwen3:8b — good balance of speed + and tool-calling support. +

{ollamaChecking && (

Detecting Ollama…

@@ -262,7 +266,7 @@ ollama pull llama3.2`} {ollamaReachable === true && !ollamaModel && (

Ollama is running but no model is pulled yet. Run{" "} - ollama pull llama3.2 first. + ollama pull qwen3:8b first.

)} diff --git a/src/modules/bot/components/TerminalPreview.tsx b/src/modules/bot/components/TerminalPreview.tsx index f9c3220..9511a34 100644 --- a/src/modules/bot/components/TerminalPreview.tsx +++ b/src/modules/bot/components/TerminalPreview.tsx @@ -13,6 +13,7 @@ function kindClass(kind: string) { if (kind === "ok") return "bg-emerald-400/10 text-emerald-300"; if (kind === "run") return "bg-sky-400/10 text-sky-300"; if (kind === "tool") return "bg-yellow-400/10 text-yellow-200"; + if (kind === "time") return "bg-fuchsia-400/10 text-fuchsia-200"; if (kind === "reply") return "bg-violet-400/10 text-violet-300"; if (kind === "msg") return "bg-cyan-400/10 text-cyan-300"; return "bg-slate-400/10 text-slate-300"; @@ -121,7 +122,7 @@ export function TerminalPreview() {
{lines.map((line, i) => (
diff --git a/src/modules/mcp/components/AddServerForm.tsx b/src/modules/mcp/components/AddServerForm.tsx new file mode 100644 index 0000000..2a61dbc --- /dev/null +++ b/src/modules/mcp/components/AddServerForm.tsx @@ -0,0 +1,308 @@ +import { useState } from "react"; +import type { ServerEntry } from ".."; + +type Props = { + busy: boolean; + onAdd: (name: string, entry: ServerEntry) => Promise; +}; + +type Mode = "paste" | "form"; + +export function AddServerForm({ busy, onAdd }: Props) { + const [open, setOpen] = useState(false); + const [mode, setMode] = useState("paste"); + const [error, setError] = useState(null); + + // Paste mode state + const [jsonText, setJsonText] = useState(""); + const [pasteName, setPasteName] = useState(""); + + // Form mode state + const [formName, setFormName] = useState(""); + const [command, setCommand] = useState(""); + const [argsText, setArgsText] = useState(""); + const [envText, setEnvText] = useState(""); + const [directReturn, setDirectReturn] = useState(false); + + const reset = () => { + setJsonText(""); + setPasteName(""); + setFormName(""); + setCommand(""); + setArgsText(""); + setEnvText(""); + setDirectReturn(false); + setError(null); + }; + + const handlePasteSubmit = async () => { + setError(null); + let parsed: unknown; + try { + parsed = JSON.parse(jsonText); + } catch { + setError("Invalid JSON"); + return; + } + + if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) { + setError("Expected a JSON object"); + return; + } + + const obj = parsed as Record; + + // Detect format: { "type": "stdio", ... } vs { "my-server": { "type": "stdio", ... } } + let name: string; + let entry: ServerEntry; + + if ("type" in obj && (obj.type === "stdio" || obj.type === "native")) { + // Direct entry — need a name from the input + if (!pasteName.trim()) { + setError("Enter a server name (the JSON has no key wrapper)"); + return; + } + name = pasteName.trim(); + entry = normalizeEntry(obj); + } else { + // Wrapped: { "server-name": { ... } } + const keys = Object.keys(obj); + if (keys.length !== 1) { + setError('Expected either a server entry or { "name": { ...entry } }'); + return; + } + name = keys[0]; + const inner = obj[name]; + if (typeof inner !== "object" || inner === null || Array.isArray(inner)) { + setError(`Value for "${name}" is not a valid server entry`); + return; + } + entry = normalizeEntry(inner as Record); + } + + try { + await onAdd(name, entry); + reset(); + setOpen(false); + } catch (e) { + setError(e instanceof Error ? e.message : "Could not add server"); + } + }; + + const handleFormSubmit = async () => { + setError(null); + const name = formName.trim(); + if (!name) { + setError("Server name is required"); + return; + } + if (!command.trim()) { + setError("Command is required"); + return; + } + + const args = argsText + .split("\n") + .map((l) => l.trim()) + .filter(Boolean); + const env: Record = {}; + for (const line of envText.split("\n")) { + const eq = line.indexOf("="); + if (eq > 0) env[line.slice(0, eq).trim()] = line.slice(eq + 1).trim(); + } + + try { + await onAdd(name, { + type: "stdio", + command: command.trim(), + args, + env, + direct_return: directReturn, + }); + reset(); + setOpen(false); + } catch (e) { + setError(e instanceof Error ? e.message : "Could not add server"); + } + }; + + const inputClass = + "w-full rounded-lg border border-white/15 bg-white/5 px-2.5 py-2 font-mono text-xs text-white outline-none placeholder:text-white/25 focus:border-white/30"; + + if (!open) { + return ( + + ); + } + + return ( +
+
+

Add server

+ +
+ + {/* Mode tabs */} +
+ {(["paste", "form"] as const).map((m) => ( + + ))} +
+ + {mode === "paste" && ( +
+
+ + setPasteName(e.target.value)} + placeholder="my-server" + className={inputClass} + /> +
+
+ +