diff --git a/.gitignore b/.gitignore index d44a52e..6562d27 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ config.toml *.swp *.swo .env +rustbot.db* diff --git a/Cargo.lock b/Cargo.lock index 9079420..d95e7d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -173,6 +173,36 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "croner" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c344b0690c1ad1c7176fe18eb173e0c927008fdaaa256e40dfd43ddd149c0843" +dependencies = [ + "chrono", +] + +[[package]] +name = "csv" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde_core", +] + +[[package]] +name = "csv-core" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782" +dependencies = [ + "memchr", +] + [[package]] name = "darling" version = "0.21.3" @@ -341,6 +371,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.3.0" @@ -541,6 +583,15 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "heck" version = "0.5.0" @@ -882,6 +933,30 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +[[package]] +name = "jiff" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c867c356cc096b33f4981825ab281ecba3db0acefe60329f044c1789d94c6543" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde_core", +] + +[[package]] +name = "jiff-static" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7946b4325269738f270bb55b3c19ab5c5040525f83fd625259422a9d25d9be5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "js-sys" version = "0.3.85" @@ -910,6 +985,17 @@ version = "0.2.180" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" +[[package]] +name = "libsqlite3-sys" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb8270bb4060bd76c6e96f20c52d80620f1d82a3470885694e41e0f81ef6fe7" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.11.0" @@ -1023,6 +1109,17 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" +[[package]] +name = "num-derive" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1164,6 +1261,21 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + +[[package]] +name = "portable-atomic-util" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9db96d7fa8782dd8c15ce32ffe8680bbd1e978a43bf51a34d39483540495f5" +dependencies = [ + "portable-atomic", +] + [[package]] name = "potential_utf" version = "0.1.4" @@ -1417,21 +1529,48 @@ dependencies = [ "syn", ] +[[package]] +name = "rusqlite" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e34486da88d8e051c7c0e23c3f15fd806ea8546260aa2fec247e97242ec143" +dependencies = [ + "bitflags", + "chrono", + "csv", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "jiff", + "libsqlite3-sys", + "serde_json", + "smallvec", + "time", + "url", + "uuid", +] + [[package]] name = "rustbot" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", + "chrono", "futures", "reqwest", "rmcp", + "rusqlite", "serde", "serde_json", + "sqlite-vec", "teloxide", "tokio", + "tokio-cron-scheduler", "toml", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -1727,6 +1866,15 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "sqlite-vec" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec77b84fb8dd5f0f8def127226db83b5d1152c5bf367f09af03998b76ba554a" +dependencies = [ + "cc", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -1743,7 +1891,6 @@ dependencies = [ "cfg-if", "libc", "psm", - "windows-sys 0.52.0", "windows-sys 0.59.0", ] @@ -1993,6 +2140,21 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "tokio-cron-scheduler" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a5597b569b4712cf78aa0c9ae29742461b7bda1e49c2a5fdad1d79bf022f8f0" +dependencies = [ + "chrono", + "croner", + "num-derive", + "num-traits", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "tokio-macros" version = "2.6.0" diff --git a/Cargo.toml b/Cargo.toml index 0b21683..2a5e409 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,3 +32,21 @@ anyhow = "1" # Async utilities futures = "0.3" + +# Async trait support +async-trait = "0.1" + +# SQLite database with FTS5 and vector search support +rusqlite = { version = "0.34", features = ["bundled", "modern-full"] } + +# UUID for message/entry IDs +uuid = { version = "1", features = ["v4"] } + +# Chrono for timestamps +chrono = { version = "0.4", features = ["serde"] } + +# Background task scheduler +tokio-cron-scheduler = "0.13" + +# SQLite vector search extension +sqlite-vec = "0.1" diff --git a/README.md b/README.md index daf0c60..b5992f9 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,7 @@ See [`config.example.toml`](config.example.toml) for all options. | `openrouter.api_key` | OpenRouter API key | | `openrouter.model` | LLM model ID (e.g., `qwen/qwen3-235b-a22b`) | | `sandbox.allowed_directory` | Directory for file/command operations | +| `embedding` (optional) | Vector search for memory | | `mcp_servers` | List of MCP servers to connect | ### MCP Server Configuration diff --git a/config.example.toml b/config.example.toml index 32197fd..af16942 100644 --- a/config.example.toml +++ b/config.example.toml @@ -27,6 +27,26 @@ Be concise and helpful.""" # The bot cannot access files outside this directory allowed_directory = "/tmp/rustbot-sandbox" +[memory] +# Path to the SQLite database file for persistent memory +# Stores conversations, knowledge base, and vector embeddings +database_path = "rustbot.db" + +[skills] +# Directory containing skill markdown files +# Skills are natural-language instructions loaded at startup +directory = "skills" + +# Embedding API for vector search (optional) +# When configured, enables hybrid vector + FTS5 search for memory. +# Without this, falls back to FTS5 keyword search only. +# Works with any OpenAI-compatible /v1/embeddings endpoint. +# [embedding] +# api_key = "YOUR_OPENROUTER_API_KEY" +# base_url = "https://openrouter.ai/api/v1" +# model = "qwen/qwen3-embedding-8b" +# dimensions = 1536 + # MCP Server Configurations # Each [[mcp_servers]] block defines an MCP server to connect to # The bot will discover and register tools from each server diff --git a/docs/plans/2026-02-16-ai-agent-framework.md b/docs/plans/2026-02-16-ai-agent-framework.md new file mode 100644 index 0000000..05b3844 --- /dev/null +++ b/docs/plans/2026-02-16-ai-agent-framework.md @@ -0,0 +1,2171 @@ +# AI Agent Framework Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Transform RustBot from a Telegram-only AI chatbot into a modular AI agent framework with persistent memory, markdown-based skills, multi-platform support, and proactive background task scheduling. + +**Architecture:** Modular monolith with Cargo feature flags. Core agent logic extracted from `bot.rs` into a platform-agnostic `agent` module. Platform adapters (Telegram first, Discord-ready) implement a `Platform` trait. SQLite with FTS5 provides persistent conversations, knowledge base, and learning. Skills are natural-language markdown files loaded at runtime. Tokio-cron-scheduler handles background tasks. + +**Tech Stack:** Rust 2021, tokio (async runtime), rusqlite (SQLite + FTS5), teloxide (Telegram), serenity/poise (Discord, future), tokio-cron-scheduler, serde/toml (config), rmcp (MCP), reqwest (HTTP/LLM). + +--- + +## Target Module Structure + +``` +src/ +├── main.rs # Entry point — initializes all subsystems +├── config.rs # Extended config (+ discord, memory, scheduler sections) +├── llm.rs # LLM client (OpenRouter) — unchanged +├── mcp.rs # MCP server manager — unchanged +├── tools.rs # Built-in sandbox tools — unchanged +├── platform/ +│ ├── mod.rs # Platform trait, IncomingMessage, OutgoingMessage types +│ └── telegram.rs # Telegram adapter (refactored from bot.rs) +├── memory/ +│ ├── mod.rs # MemoryStore struct, init, migrations +│ ├── conversations.rs # Save/load/search conversations +│ └── knowledge.rs # Knowledge base with FTS5 search +├── skills/ +│ ├── mod.rs # SkillRegistry, Skill struct +│ └── loader.rs # Load .md skill files from directory +├── scheduler/ +│ ├── mod.rs # Scheduler wrapper around tokio-cron-scheduler +│ └── tasks.rs # Reminder and heartbeat task types +└── agent.rs # Core agentic loop — platform-agnostic +``` + +--- + +## Phase 1: SQLite Memory Foundation + +### Task 1: Add SQLite Dependencies + +**Files:** +- Modify: `Cargo.toml:6-34` + +**Step 1: Add rusqlite dependency** + +Add to `[dependencies]` section in `Cargo.toml`: + +```toml +# SQLite database with FTS5 +rusqlite = { version = "0.34", features = ["bundled", "modern_full"] } + +# Chrono for timestamps +chrono = { version = "0.4", features = ["serde"] } + +# UUID for message IDs +uuid = { version = "1", features = ["v4"] } +``` + +**Step 2: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully (downloads new crates) + +**Step 3: Commit** + +```bash +git add Cargo.toml Cargo.lock +git commit -m "feat: add rusqlite, chrono, uuid dependencies for memory system" +``` + +--- + +### Task 2: Create Memory Module — Schema & Initialization + +**Files:** +- Create: `src/memory/mod.rs` +- Create: `src/memory/conversations.rs` +- Create: `src/memory/knowledge.rs` +- Modify: `src/main.rs:1-5` (add `mod memory;`) + +**Step 1: Create `src/memory/mod.rs`** + +```rust +pub mod conversations; +pub mod knowledge; + +use anyhow::{Context, Result}; +use rusqlite::Connection; +use std::path::Path; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::info; + +/// Thread-safe SQLite memory store +#[derive(Clone)] +pub struct MemoryStore { + conn: Arc>, +} + +impl MemoryStore { + /// Open or create the SQLite database at the given path + pub fn open(path: &Path) -> Result { + let conn = Connection::open(path) + .with_context(|| format!("Failed to open database: {}", path.display()))?; + + // Enable WAL mode for better concurrent read performance + conn.execute_batch("PRAGMA journal_mode=WAL;")?; + conn.execute_batch("PRAGMA foreign_keys=ON;")?; + + let store = Self { + conn: Arc::new(Mutex::new(conn)), + }; + + store.run_migrations_sync()?; + info!("Memory store initialized at: {}", path.display()); + Ok(store) + } + + /// Open an in-memory database (for testing) + pub fn open_in_memory() -> Result { + let conn = Connection::open_in_memory()?; + conn.execute_batch("PRAGMA foreign_keys=ON;")?; + let store = Self { + conn: Arc::new(Mutex::new(conn)), + }; + store.run_migrations_sync()?; + Ok(store) + } + + fn run_migrations_sync(&self) -> Result<()> { + // We need to block on the mutex here since this is called from sync context + // This is safe because it's only called during initialization + let conn = self.conn.blocking_lock(); + conn.execute_batch( + " + CREATE TABLE IF NOT EXISTS conversations ( + id TEXT PRIMARY KEY, + platform TEXT NOT NULL, + user_id TEXT NOT NULL, + started_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + conversation_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT, + tool_calls TEXT, + tool_call_id TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + FOREIGN KEY (conversation_id) REFERENCES conversations(id) + ); + + CREATE INDEX IF NOT EXISTS idx_messages_conversation + ON messages(conversation_id, created_at); + + CREATE INDEX IF NOT EXISTS idx_conversations_user + ON conversations(platform, user_id, updated_at); + + CREATE TABLE IF NOT EXISTS knowledge ( + id TEXT PRIMARY KEY, + category TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + source TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + + CREATE UNIQUE INDEX IF NOT EXISTS idx_knowledge_key + ON knowledge(category, key); + + -- FTS5 virtual table for full-text search across messages + CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5( + content, + content=messages, + content_rowid=rowid + ); + + -- FTS5 virtual table for knowledge base search + CREATE VIRTUAL TABLE IF NOT EXISTS knowledge_fts USING fts5( + key, + value, + content=knowledge, + content_rowid=rowid + ); + + -- Triggers to keep FTS in sync + CREATE TRIGGER IF NOT EXISTS messages_fts_insert AFTER INSERT ON messages + WHEN NEW.content IS NOT NULL BEGIN + INSERT INTO messages_fts(rowid, content) VALUES (NEW.rowid, NEW.content); + END; + + CREATE TRIGGER IF NOT EXISTS messages_fts_delete AFTER DELETE ON messages + WHEN OLD.content IS NOT NULL BEGIN + INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', OLD.rowid, OLD.content); + END; + + CREATE TRIGGER IF NOT EXISTS knowledge_fts_insert AFTER INSERT ON knowledge BEGIN + INSERT INTO knowledge_fts(rowid, key, value) VALUES (NEW.rowid, NEW.key, NEW.value); + END; + + CREATE TRIGGER IF NOT EXISTS knowledge_fts_delete AFTER DELETE ON knowledge BEGIN + INSERT INTO knowledge_fts(knowledge_fts, rowid, key, value) VALUES('delete', OLD.rowid, OLD.key, OLD.value); + END; + + CREATE TRIGGER IF NOT EXISTS knowledge_fts_update AFTER UPDATE ON knowledge BEGIN + INSERT INTO knowledge_fts(knowledge_fts, rowid, key, value) VALUES('delete', OLD.rowid, OLD.key, OLD.value); + INSERT INTO knowledge_fts(rowid, key, value) VALUES (NEW.rowid, NEW.key, NEW.value); + END; + ", + )?; + Ok(()) + } +} +``` + +**Step 2: Create `src/memory/conversations.rs`** + +```rust +use anyhow::{Context, Result}; +use uuid::Uuid; + +use super::MemoryStore; +use crate::llm::ChatMessage; + +impl MemoryStore { + /// Get or create a conversation for a platform user + pub async fn get_or_create_conversation( + &self, + platform: &str, + user_id: &str, + ) -> Result { + let conn = self.conn.lock().await; + + // Try to find an existing active conversation + let existing: Option = conn + .query_row( + "SELECT id FROM conversations + WHERE platform = ?1 AND user_id = ?2 + ORDER BY updated_at DESC LIMIT 1", + rusqlite::params![platform, user_id], + |row| row.get(0), + ) + .ok(); + + if let Some(id) = existing { + return Ok(id); + } + + // Create a new conversation + let id = Uuid::new_v4().to_string(); + conn.execute( + "INSERT INTO conversations (id, platform, user_id) VALUES (?1, ?2, ?3)", + rusqlite::params![&id, platform, user_id], + ) + .context("Failed to create conversation")?; + + Ok(id) + } + + /// Save a message to a conversation + pub async fn save_message( + &self, + conversation_id: &str, + message: &ChatMessage, + ) -> Result { + let conn = self.conn.lock().await; + let id = Uuid::new_v4().to_string(); + + let tool_calls_json = message + .tool_calls + .as_ref() + .map(|tc| serde_json::to_string(tc).unwrap_or_default()); + + conn.execute( + "INSERT INTO messages (id, conversation_id, role, content, tool_calls, tool_call_id) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + rusqlite::params![ + &id, + conversation_id, + &message.role, + &message.content, + &tool_calls_json, + &message.tool_call_id, + ], + ) + .context("Failed to save message")?; + + // Update conversation timestamp + conn.execute( + "UPDATE conversations SET updated_at = datetime('now') WHERE id = ?1", + rusqlite::params![conversation_id], + )?; + + Ok(id) + } + + /// Load all messages for a conversation + pub async fn load_messages(&self, conversation_id: &str) -> Result> { + let conn = self.conn.lock().await; + let mut stmt = conn.prepare( + "SELECT role, content, tool_calls, tool_call_id + FROM messages + WHERE conversation_id = ?1 + ORDER BY created_at ASC", + )?; + + let messages = stmt + .query_map(rusqlite::params![conversation_id], |row| { + let tool_calls_json: Option = row.get(2)?; + let tool_calls = tool_calls_json.and_then(|json| serde_json::from_str(&json).ok()); + + Ok(ChatMessage { + role: row.get(0)?, + content: row.get(1)?, + tool_calls, + tool_call_id: row.get(3)?, + }) + })? + .collect::, _>>() + .context("Failed to load messages")?; + + Ok(messages) + } + + /// Clear a conversation (delete all its messages) + pub async fn clear_conversation(&self, platform: &str, user_id: &str) -> Result<()> { + let conn = self.conn.lock().await; + + conn.execute( + "DELETE FROM messages WHERE conversation_id IN ( + SELECT id FROM conversations WHERE platform = ?1 AND user_id = ?2 + )", + rusqlite::params![platform, user_id], + )?; + + conn.execute( + "DELETE FROM conversations WHERE platform = ?1 AND user_id = ?2", + rusqlite::params![platform, user_id], + )?; + + Ok(()) + } + + /// Search messages using FTS5 full-text search + pub async fn search_messages(&self, query: &str, limit: usize) -> Result> { + let conn = self.conn.lock().await; + let mut stmt = conn.prepare( + "SELECT m.role, m.content, m.tool_calls, m.tool_call_id + FROM messages m + JOIN messages_fts fts ON m.rowid = fts.rowid + WHERE messages_fts MATCH ?1 + ORDER BY rank + LIMIT ?2", + )?; + + let messages = stmt + .query_map(rusqlite::params![query, limit as i64], |row| { + let tool_calls_json: Option = row.get(2)?; + let tool_calls = tool_calls_json.and_then(|json| serde_json::from_str(&json).ok()); + + Ok(ChatMessage { + role: row.get(0)?, + content: row.get(1)?, + tool_calls, + tool_call_id: row.get(3)?, + }) + })? + .collect::, _>>() + .context("Failed to search messages")?; + + Ok(messages) + } +} +``` + +**Step 3: Create `src/memory/knowledge.rs`** + +```rust +use anyhow::{Context, Result}; +use uuid::Uuid; + +use super::MemoryStore; + +/// A knowledge entry the agent has learned +#[derive(Debug, Clone)] +pub struct KnowledgeEntry { + pub id: String, + pub category: String, + pub key: String, + pub value: String, + pub source: Option, +} + +impl MemoryStore { + /// Store or update a knowledge entry + pub async fn remember( + &self, + category: &str, + key: &str, + value: &str, + source: Option<&str>, + ) -> Result<()> { + let conn = self.conn.lock().await; + let id = Uuid::new_v4().to_string(); + + conn.execute( + "INSERT INTO knowledge (id, category, key, value, source) + VALUES (?1, ?2, ?3, ?4, ?5) + ON CONFLICT(category, key) DO UPDATE SET + value = excluded.value, + source = excluded.source, + updated_at = datetime('now')", + rusqlite::params![&id, category, key, value, source], + ) + .context("Failed to store knowledge")?; + + Ok(()) + } + + /// Recall a specific knowledge entry + pub async fn recall(&self, category: &str, key: &str) -> Result> { + let conn = self.conn.lock().await; + let result = conn + .query_row( + "SELECT value FROM knowledge WHERE category = ?1 AND key = ?2", + rusqlite::params![category, key], + |row| row.get(0), + ) + .ok(); + + Ok(result) + } + + /// Search knowledge using FTS5 + pub async fn search_knowledge( + &self, + query: &str, + limit: usize, + ) -> Result> { + let conn = self.conn.lock().await; + let mut stmt = conn.prepare( + "SELECT k.id, k.category, k.key, k.value, k.source + FROM knowledge k + JOIN knowledge_fts fts ON k.rowid = fts.rowid + WHERE knowledge_fts MATCH ?1 + ORDER BY rank + LIMIT ?2", + )?; + + let entries = stmt + .query_map(rusqlite::params![query, limit as i64], |row| { + Ok(KnowledgeEntry { + id: row.get(0)?, + category: row.get(1)?, + key: row.get(2)?, + value: row.get(3)?, + source: row.get(4)?, + }) + })? + .collect::, _>>() + .context("Failed to search knowledge")?; + + Ok(entries) + } + + /// List all knowledge in a category + pub async fn list_knowledge(&self, category: &str) -> Result> { + let conn = self.conn.lock().await; + let mut stmt = conn.prepare( + "SELECT id, category, key, value, source + FROM knowledge + WHERE category = ?1 + ORDER BY key", + )?; + + let entries = stmt + .query_map(rusqlite::params![category], |row| { + Ok(KnowledgeEntry { + id: row.get(0)?, + category: row.get(1)?, + key: row.get(2)?, + value: row.get(3)?, + source: row.get(4)?, + }) + })? + .collect::, _>>() + .context("Failed to list knowledge")?; + + Ok(entries) + } + + /// Forget a knowledge entry + pub async fn forget(&self, category: &str, key: &str) -> Result { + let conn = self.conn.lock().await; + let rows = conn.execute( + "DELETE FROM knowledge WHERE category = ?1 AND key = ?2", + rusqlite::params![category, key], + )?; + Ok(rows > 0) + } +} +``` + +**Step 4: Register module in `src/main.rs`** + +Add `mod memory;` to the module declarations at the top of `src/main.rs` (line 1). + +**Step 5: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully + +**Step 6: Commit** + +```bash +git add src/memory/ src/main.rs +git commit -m "feat: add SQLite memory module with conversations, knowledge, and FTS5 search" +``` + +--- + +### Task 3: Extend Config for Memory and Database Path + +**Files:** +- Modify: `src/config.rs:6-12` (add memory config) +- Modify: `config.example.toml` (add memory section) + +**Step 1: Add MemoryConfig to `src/config.rs`** + +Add a new struct and field: + +```rust +#[derive(Debug, Deserialize, Clone)] +pub struct MemoryConfig { + #[serde(default = "default_db_path")] + pub database_path: PathBuf, +} + +fn default_db_path() -> PathBuf { + PathBuf::from("rustbot.db") +} +``` + +Add to the `Config` struct: + +```rust +#[serde(default = "default_memory_config")] +pub memory: MemoryConfig, +``` + +Add the default function: + +```rust +fn default_memory_config() -> MemoryConfig { + MemoryConfig { + database_path: default_db_path(), + } +} +``` + +**Step 2: Add `[memory]` section to `config.example.toml`** + +```toml +[memory] +# Path to the SQLite database file for persistent memory +database_path = "rustbot.db" +``` + +**Step 3: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully + +**Step 4: Commit** + +```bash +git add src/config.rs config.example.toml +git commit -m "feat: add memory configuration for SQLite database path" +``` + +--- + +### Task 4: Wire MemoryStore into AppState and Main + +**Files:** +- Modify: `src/main.rs:51-56` (initialize MemoryStore) +- Modify: `src/bot.rs:33-49` (add memory to AppState) + +**Step 1: Initialize MemoryStore in `src/main.rs`** + +After config loading (around line 49), add: + +```rust +use crate::memory::MemoryStore; + +// Initialize memory store +let memory = MemoryStore::open(&config.memory.database_path) + .context("Failed to initialize memory store")?; +info!(" Database: {}", config.memory.database_path.display()); +``` + +Pass it to AppState: + +```rust +let state = Arc::new(AppState::new(config, mcp_manager, memory)); +``` + +**Step 2: Add MemoryStore to AppState in `src/bot.rs`** + +Add field to `AppState`: + +```rust +pub struct AppState { + llm: LlmClient, + config: Config, + mcp: McpManager, + memory: MemoryStore, + conversations: Mutex>, +} +``` + +Update constructor: + +```rust +pub fn new(config: Config, mcp: McpManager, memory: MemoryStore) -> Self { + let llm = LlmClient::new(config.openrouter.clone()); + Self { + llm, + config, + mcp, + memory, + conversations: Mutex::new(HashMap::new()), + } +} +``` + +**Step 3: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully + +**Step 4: Commit** + +```bash +git add src/main.rs src/bot.rs +git commit -m "feat: wire MemoryStore into AppState and initialization" +``` + +--- + +### Task 5: Persist Conversations to SQLite + +**Files:** +- Modify: `src/bot.rs:171-266` (update `process_with_llm` to use MemoryStore) + +**Step 1: Refactor `process_with_llm` to persist messages** + +Replace the in-memory-only approach. At conversation start, load from SQLite. After each message (user, assistant, tool), save to SQLite. Keep in-memory cache for the active session but back it with persistence. + +In `process_with_llm`, change the conversation initialization to: + +```rust +// Get or create persistent conversation +let conversation_id = state + .memory + .get_or_create_conversation("telegram", &user_id.to_string()) + .await?; +``` + +After adding user message to in-memory, also persist: + +```rust +state.memory.save_message(&conversation_id, &user_msg).await?; +``` + +Same pattern for assistant messages and tool results — after adding to in-memory conversation, also call `state.memory.save_message(...)`. + +For `/clear`, also call: + +```rust +state.memory.clear_conversation("telegram", &user_id.to_string()).await?; +``` + +**Step 2: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully + +**Step 3: Commit** + +```bash +git add src/bot.rs +git commit -m "feat: persist conversation messages to SQLite memory" +``` + +--- + +## Phase 2: Platform Abstraction Layer + +### Task 6: Define Platform Trait and Message Types + +**Files:** +- Create: `src/platform/mod.rs` +- Modify: `src/main.rs:1-5` (add `mod platform;`) + +**Step 1: Create `src/platform/mod.rs`** + +```rust +pub mod telegram; + +use anyhow::Result; +use async_trait::async_trait; + +/// A message received from any platform +#[derive(Debug, Clone)] +pub struct IncomingMessage { + /// Platform identifier (e.g., "telegram", "discord") + pub platform: String, + /// Platform-specific user ID as string + pub user_id: String, + /// Platform-specific chat/channel ID as string + pub chat_id: String, + /// Display name of the user + pub user_name: String, + /// The message text + pub text: String, +} + +/// A message to send back to a platform +#[derive(Debug, Clone)] +pub struct OutgoingMessage { + /// The chat/channel to send to + pub chat_id: String, + /// The message text + pub text: String, +} + +/// Trait that all platform adapters must implement +#[async_trait] +pub trait Platform: Send + Sync { + /// Platform name identifier + fn name(&self) -> &str; + + /// Start the platform's event loop. This should block until shutdown. + async fn run(&self) -> Result<()>; +} +``` + +**Step 2: Add `async-trait` dependency to `Cargo.toml`** + +```toml +# Async trait support +async-trait = "0.1" +``` + +**Step 3: Register module in `src/main.rs`** + +Add `mod platform;` to the module declarations. + +**Step 4: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully + +**Step 5: Commit** + +```bash +git add src/platform/ src/main.rs Cargo.toml Cargo.lock +git commit -m "feat: add platform abstraction layer with trait and message types" +``` + +--- + +### Task 7: Extract Agent Core from bot.rs + +**Files:** +- Create: `src/agent.rs` +- Modify: `src/main.rs:1-5` (add `mod agent;`) + +**Step 1: Create `src/agent.rs`** + +Extract the platform-agnostic agentic loop. This is the brain of the system — receives an `IncomingMessage`, processes it through LLM + tools, returns response text. + +```rust +use anyhow::Result; +use std::sync::Arc; +use tracing::{error, info}; + +use crate::config::Config; +use crate::llm::{ChatMessage, LlmClient, ToolDefinition}; +use crate::mcp::McpManager; +use crate::memory::MemoryStore; +use crate::platform::IncomingMessage; +use crate::skills::SkillRegistry; +use crate::tools; + +/// The core agent that processes messages through LLM + tools +pub struct Agent { + pub llm: LlmClient, + pub config: Config, + pub mcp: McpManager, + pub memory: MemoryStore, + pub skills: SkillRegistry, +} + +impl Agent { + pub fn new( + config: Config, + mcp: McpManager, + memory: MemoryStore, + skills: SkillRegistry, + ) -> Self { + let llm = LlmClient::new(config.openrouter.clone()); + Self { + llm, + config, + mcp, + memory, + skills, + } + } + + /// Build the system prompt, incorporating loaded skills + fn build_system_prompt(&self) -> String { + let mut prompt = self.config.openrouter.system_prompt.clone(); + + let skill_context = self.skills.build_context(); + if !skill_context.is_empty() { + prompt.push_str("\n\n# Available Skills\n\n"); + prompt.push_str(&skill_context); + } + + prompt + } + + /// Process an incoming message and return the response text + pub async fn process_message(&self, incoming: &IncomingMessage) -> Result { + let platform = &incoming.platform; + let user_id = &incoming.user_id; + + // Get or create persistent conversation + let conversation_id = self + .memory + .get_or_create_conversation(platform, user_id) + .await?; + + // Load existing messages from memory + let mut messages = self.memory.load_messages(&conversation_id).await?; + + // If no messages yet, add system prompt + if messages.is_empty() { + let system_msg = ChatMessage { + role: "system".to_string(), + content: Some(self.build_system_prompt()), + tool_calls: None, + tool_call_id: None, + }; + self.memory + .save_message(&conversation_id, &system_msg) + .await?; + messages.push(system_msg); + } + + // Add user message + let user_msg = ChatMessage { + role: "user".to_string(), + content: Some(incoming.text.clone()), + tool_calls: None, + tool_call_id: None, + }; + self.memory + .save_message(&conversation_id, &user_msg) + .await?; + messages.push(user_msg); + + // Gather all tool definitions + let mut all_tools: Vec = tools::builtin_tool_definitions(); + all_tools.extend(self.mcp.tool_definitions()); + all_tools.extend(self.memory_tool_definitions()); + + // Agentic loop + let max_iterations = 10; + for iteration in 0..max_iterations { + let response = self.llm.chat(&messages, &all_tools).await?; + + if let Some(tool_calls) = &response.tool_calls { + if !tool_calls.is_empty() { + info!( + "LLM requested {} tool call(s) (iteration {})", + tool_calls.len(), + iteration + ); + + // Save assistant message with tool calls + self.memory + .save_message(&conversation_id, &response) + .await?; + messages.push(response.clone()); + + // Execute each tool call + for tool_call in tool_calls { + let arguments: serde_json::Value = + serde_json::from_str(&tool_call.function.arguments) + .unwrap_or(serde_json::Value::Object(serde_json::Map::new())); + + let tool_result = + self.execute_tool(&tool_call.function.name, &arguments).await; + + info!( + "Tool '{}' result length: {} chars", + tool_call.function.name, + tool_result.len() + ); + + let tool_msg = ChatMessage { + role: "tool".to_string(), + content: Some(tool_result), + tool_calls: None, + tool_call_id: Some(tool_call.id.clone()), + }; + self.memory + .save_message(&conversation_id, &tool_msg) + .await?; + messages.push(tool_msg); + } + + continue; + } + } + + // Final response — no tool calls + let content = response.content.clone().unwrap_or_default(); + self.memory + .save_message(&conversation_id, &response) + .await?; + + return Ok(content); + } + + Ok("I've reached the maximum number of tool call iterations. Please try rephrasing your request.".to_string()) + } + + /// Clear conversation history for a user + pub async fn clear_conversation(&self, platform: &str, user_id: &str) -> Result<()> { + self.memory.clear_conversation(platform, user_id).await + } + + /// Get all tool definitions for display + pub fn all_tool_definitions(&self) -> Vec { + let mut all_tools = tools::builtin_tool_definitions(); + all_tools.extend(self.mcp.tool_definitions()); + all_tools.extend(self.memory_tool_definitions()); + all_tools + } + + /// Memory-related tool definitions exposed to the LLM + fn memory_tool_definitions(&self) -> Vec { + use crate::llm::FunctionDefinition; + use serde_json::json; + + vec![ + ToolDefinition { + tool_type: "function".to_string(), + function: FunctionDefinition { + name: "remember".to_string(), + description: "Store a piece of knowledge for long-term memory. Use this to remember user preferences, facts, or anything useful.".to_string(), + parameters: json!({ + "type": "object", + "properties": { + "category": { "type": "string", "description": "Category (e.g., 'user_preference', 'fact', 'project')" }, + "key": { "type": "string", "description": "Short identifier for this knowledge" }, + "value": { "type": "string", "description": "The knowledge to remember" } + }, + "required": ["category", "key", "value"] + }), + }, + }, + ToolDefinition { + tool_type: "function".to_string(), + function: FunctionDefinition { + name: "recall".to_string(), + description: "Retrieve a specific piece of remembered knowledge.".to_string(), + parameters: json!({ + "type": "object", + "properties": { + "category": { "type": "string", "description": "Category to search in" }, + "key": { "type": "string", "description": "The key to look up" } + }, + "required": ["category", "key"] + }), + }, + }, + ToolDefinition { + tool_type: "function".to_string(), + function: FunctionDefinition { + name: "search_memory".to_string(), + description: "Search through past conversations and knowledge using full-text search.".to_string(), + parameters: json!({ + "type": "object", + "properties": { + "query": { "type": "string", "description": "Search query" }, + "limit": { "type": "integer", "description": "Max results (default 5)" } + }, + "required": ["query"] + }), + }, + }, + ] + } + + /// Execute a tool call by routing to the right handler + async fn execute_tool(&self, name: &str, arguments: &serde_json::Value) -> String { + // Memory tools + match name { + "remember" => { + let category = arguments["category"].as_str().unwrap_or("general"); + let key = arguments["key"].as_str().unwrap_or(""); + let value = arguments["value"].as_str().unwrap_or(""); + match self.memory.remember(category, key, value, None).await { + Ok(()) => format!("Remembered: [{}] {} = {}", category, key, value), + Err(e) => format!("Failed to remember: {}", e), + } + } + "recall" => { + let category = arguments["category"].as_str().unwrap_or("general"); + let key = arguments["key"].as_str().unwrap_or(""); + match self.memory.recall(category, key).await { + Ok(Some(value)) => value, + Ok(None) => format!("No knowledge found for [{}] {}", category, key), + Err(e) => format!("Failed to recall: {}", e), + } + } + "search_memory" => { + let query = arguments["query"].as_str().unwrap_or(""); + let limit = arguments["limit"].as_u64().unwrap_or(5) as usize; + + let mut results = Vec::new(); + + // Search conversations + if let Ok(msgs) = self.memory.search_messages(query, limit).await { + for msg in msgs { + if let Some(content) = &msg.content { + results.push(format!("[{}]: {}", msg.role, content)); + } + } + } + + // Search knowledge + if let Ok(entries) = self.memory.search_knowledge(query, limit).await { + for entry in entries { + results.push(format!("[knowledge:{}] {} = {}", entry.category, entry.key, entry.value)); + } + } + + if results.is_empty() { + "No results found.".to_string() + } else { + results.join("\n\n") + } + } + _ if self.mcp.is_mcp_tool(name) => { + match self.mcp.call_tool(name, arguments).await { + Ok(result) => result, + Err(e) => format!("MCP tool error: {}", e), + } + } + _ => { + match tools::execute_builtin_tool(name, arguments, &self.config.sandbox.allowed_directory).await { + Ok(result) => result, + Err(e) => format!("Tool error: {}", e), + } + } + } + } +} +``` + +**Step 2: Register module in `src/main.rs`** + +Add `mod agent;` to the module declarations. + +**Step 3: Verify it compiles** + +Note: This will not compile yet because it references `crate::skills::SkillRegistry` which doesn't exist. We'll create a minimal stub in the next task. + +**Step 4: Commit** + +```bash +git add src/agent.rs src/main.rs +git commit -m "feat: extract platform-agnostic agent core with memory tools" +``` + +--- + +## Phase 3: Skill System + +### Task 8: Create Skill Loader and Registry + +**Files:** +- Create: `src/skills/mod.rs` +- Create: `src/skills/loader.rs` +- Modify: `src/main.rs:1-5` (add `mod skills;`) + +**Step 1: Create `src/skills/mod.rs`** + +```rust +pub mod loader; + +use std::collections::HashMap; +use tracing::info; + +/// A loaded skill from a markdown file +#[derive(Debug, Clone)] +pub struct Skill { + /// Skill name (derived from filename or frontmatter) + pub name: String, + /// Short description + pub description: String, + /// Full markdown content (the instructions) + pub content: String, + /// Category/tags for organization + pub tags: Vec, +} + +/// Registry of all loaded skills +#[derive(Debug, Clone)] +pub struct SkillRegistry { + skills: HashMap, +} + +impl SkillRegistry { + pub fn new() -> Self { + Self { + skills: HashMap::new(), + } + } + + /// Register a skill + pub fn register(&mut self, skill: Skill) { + info!("Registered skill: {} — {}", skill.name, skill.description); + self.skills.insert(skill.name.clone(), skill); + } + + /// Get a skill by name + pub fn get(&self, name: &str) -> Option<&Skill> { + self.skills.get(name) + } + + /// List all registered skills + pub fn list(&self) -> Vec<&Skill> { + self.skills.values().collect() + } + + /// Build context string for the system prompt + /// This gives the LLM awareness of available skills + pub fn build_context(&self) -> String { + if self.skills.is_empty() { + return String::new(); + } + + let mut context = String::from("You have the following skills available. When relevant, follow these instructions:\n\n"); + for skill in self.skills.values() { + context.push_str(&format!("## Skill: {}\n", skill.name)); + context.push_str(&format!("{}\n\n", skill.content)); + } + context + } + + /// Count of registered skills + pub fn len(&self) -> usize { + self.skills.len() + } + + pub fn is_empty(&self) -> bool { + self.skills.is_empty() + } +} +``` + +**Step 2: Create `src/skills/loader.rs`** + +```rust +use anyhow::{Context, Result}; +use std::path::Path; +use tracing::{info, warn}; + +use super::{Skill, SkillRegistry}; + +/// Load all markdown skill files from a directory +/// +/// Skill format (markdown with optional YAML frontmatter): +/// +/// ```markdown +/// --- +/// name: my-skill +/// description: What this skill does +/// tags: [coding, review] +/// --- +/// +/// # Skill Instructions +/// +/// The full markdown content is the skill's instructions... +/// ``` +/// +/// If no frontmatter, the filename (without extension) is the name, +/// and the first heading or first line is the description. +pub async fn load_skills_from_dir(dir: &Path) -> Result { + let mut registry = SkillRegistry::new(); + + if !dir.exists() { + info!("Skills directory not found: {}, skipping", dir.display()); + return Ok(registry); + } + + let mut entries = tokio::fs::read_dir(dir) + .await + .with_context(|| format!("Failed to read skills directory: {}", dir.display()))?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + + // Support .md files and directories containing SKILL.md + let skill_path = if path.is_dir() { + let skill_file = path.join("SKILL.md"); + if skill_file.exists() { + skill_file + } else { + continue; + } + } else if path.extension().and_then(|e| e.to_str()) == Some("md") { + path.clone() + } else { + continue; + }; + + match load_skill_file(&skill_path).await { + Ok(skill) => registry.register(skill), + Err(e) => warn!("Failed to load skill from {}: {}", skill_path.display(), e), + } + } + + info!("Loaded {} skills", registry.len()); + Ok(registry) +} + +async fn load_skill_file(path: &Path) -> Result { + let content = tokio::fs::read_to_string(path) + .await + .with_context(|| format!("Failed to read skill file: {}", path.display()))?; + + // Try to parse YAML frontmatter + if content.starts_with("---") { + if let Some(end) = content[3..].find("---") { + let frontmatter = &content[3..3 + end].trim(); + let body = content[3 + end + 3..].trim().to_string(); + + let name = extract_field(frontmatter, "name"); + let description = extract_field(frontmatter, "description"); + let tags = extract_list_field(frontmatter, "tags"); + + let skill_name = name.unwrap_or_else(|| name_from_path(path)); + + return Ok(Skill { + name: skill_name, + description: description.unwrap_or_else(|| first_line_or_heading(&body)), + content: body, + tags, + }); + } + } + + // No frontmatter — derive metadata from content + let name = name_from_path(path); + let description = first_line_or_heading(&content); + + Ok(Skill { + name, + description, + content: content.to_string(), + tags: Vec::new(), + }) +} + +/// Extract a simple `key: value` from YAML-like frontmatter +fn extract_field(frontmatter: &str, key: &str) -> Option { + for line in frontmatter.lines() { + let line = line.trim(); + if let Some(rest) = line.strip_prefix(&format!("{}:", key)) { + let value = rest.trim().trim_matches('"').trim_matches('\''); + if !value.is_empty() { + return Some(value.to_string()); + } + } + } + None +} + +/// Extract a simple `key: [a, b, c]` list from frontmatter +fn extract_list_field(frontmatter: &str, key: &str) -> Vec { + for line in frontmatter.lines() { + let line = line.trim(); + if let Some(rest) = line.strip_prefix(&format!("{}:", key)) { + let rest = rest.trim(); + if rest.starts_with('[') && rest.ends_with(']') { + return rest[1..rest.len() - 1] + .split(',') + .map(|s| s.trim().trim_matches('"').trim_matches('\'').to_string()) + .filter(|s| !s.is_empty()) + .collect(); + } + } + } + Vec::new() +} + +/// Derive skill name from file path +fn name_from_path(path: &Path) -> String { + // If it's SKILL.md inside a directory, use the directory name + if path.file_name().and_then(|f| f.to_str()) == Some("SKILL.md") { + if let Some(parent) = path.parent() { + if let Some(dir_name) = parent.file_name().and_then(|f| f.to_str()) { + return dir_name.to_string(); + } + } + } + // Otherwise use the file stem + path.file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("unnamed") + .to_string() +} + +/// Get the first heading or first line as a description +fn first_line_or_heading(content: &str) -> String { + for line in content.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + if let Some(heading) = line.strip_prefix('#') { + return heading.trim().trim_start_matches('#').trim().to_string(); + } + return line.to_string(); + } + "No description".to_string() +} +``` + +**Step 3: Register module in `src/main.rs`** + +Add `mod skills;` to the module declarations. + +**Step 4: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully + +**Step 5: Commit** + +```bash +git add src/skills/ src/main.rs +git commit -m "feat: add skill system with markdown loader and registry" +``` + +--- + +### Task 9: Add Skills Config and Wire Skills Loading + +**Files:** +- Modify: `src/config.rs` (add SkillsConfig) +- Modify: `src/main.rs` (load skills during init) +- Modify: `config.example.toml` (add skills section) + +**Step 1: Add SkillsConfig to `src/config.rs`** + +```rust +#[derive(Debug, Deserialize, Clone)] +pub struct SkillsConfig { + #[serde(default = "default_skills_dir")] + pub directory: PathBuf, +} + +fn default_skills_dir() -> PathBuf { + PathBuf::from("skills") +} + +fn default_skills_config() -> SkillsConfig { + SkillsConfig { + directory: default_skills_dir(), + } +} +``` + +Add to `Config` struct: + +```rust +#[serde(default = "default_skills_config")] +pub skills: SkillsConfig, +``` + +**Step 2: Load skills in `src/main.rs`** + +After memory initialization: + +```rust +use crate::skills::loader::load_skills_from_dir; + +// Load skills +let skills = load_skills_from_dir(&config.skills.directory).await?; +info!(" Skills: {}", skills.len()); +``` + +**Step 3: Add to `config.example.toml`** + +```toml +[skills] +# Directory containing skill markdown files +directory = "skills" +``` + +**Step 4: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully + +**Step 5: Commit** + +```bash +git add src/config.rs src/main.rs config.example.toml +git commit -m "feat: add skills configuration and loading at startup" +``` + +--- + +## Phase 4: Refactor Telegram to Use Agent + Platform + +### Task 10: Refactor Telegram as a Platform Adapter + +**Files:** +- Create: `src/platform/telegram.rs` +- Modify: `src/bot.rs` (slim down to delegation) +- Modify: `src/main.rs` (pass Agent to bot) + +**Step 1: Create `src/platform/telegram.rs`** + +```rust +use std::sync::Arc; + +use anyhow::Result; +use teloxide::prelude::*; +use tracing::{error, info, warn}; + +use crate::agent::Agent; +use crate::platform::IncomingMessage; + +/// Split long messages for Telegram's 4096 char limit +fn split_message(text: &str, max_len: usize) -> Vec { + if text.len() <= max_len { + return vec![text.to_string()]; + } + + let mut chunks = Vec::new(); + let mut start = 0; + + while start < text.len() { + let end = (start + max_len).min(text.len()); + let actual_end = if end < text.len() { + text[start..end] + .rfind('\n') + .or_else(|| text[start..end].rfind(' ')) + .map(|pos| start + pos + 1) + .unwrap_or(end) + } else { + end + }; + + chunks.push(text[start..actual_end].to_string()); + start = actual_end; + } + + chunks +} + +/// Run the Telegram bot platform +pub async fn run(agent: Arc, allowed_user_ids: Vec, bot_token: &str) -> Result<()> { + let bot = Bot::new(bot_token); + + info!("Starting Telegram platform..."); + + let handler = Update::filter_message() + .filter_map(move |msg: Message| { + let user = msg.from.as_ref()?; + if allowed_user_ids.contains(&user.id.0) { + Some(msg) + } else { + None + } + }) + .endpoint(handle_message); + + Dispatcher::builder(bot, handler) + .dependencies(dptree::deps![agent]) + .default_handler(|upd| async move { + warn!("Unhandled update: {:?}", upd.id); + }) + .error_handler(LoggingErrorHandler::with_custom_text("telegram")) + .build() + .dispatch() + .await; + + Ok(()) +} + +async fn handle_message(bot: Bot, msg: Message, agent: Arc) -> ResponseResult<()> { + let user = match msg.from.as_ref() { + Some(user) => user, + None => return Ok(()), + }; + + let user_id = user.id.0; + let text = match msg.text() { + Some(t) => t.to_string(), + None => return Ok(()), + }; + + let user_name = user + .first_name + .clone(); + + info!("Telegram message from {} ({}): {}", user_name, user_id, text); + + // Handle commands + if text == "/clear" { + if let Err(e) = agent.clear_conversation("telegram", &user_id.to_string()).await { + error!("Failed to clear conversation: {}", e); + } + bot.send_message(msg.chat.id, "Conversation cleared.").await?; + return Ok(()); + } + + if text == "/start" { + bot.send_message( + msg.chat.id, + "Hello! I'm your AI assistant. Send me a message and I'll help you.\n\n\ + Commands:\n\ + /clear - Clear conversation history\n\ + /tools - List available tools", + ) + .await?; + return Ok(()); + } + + if text == "/tools" { + let all_tools = agent.all_tool_definitions(); + let mut tool_list = String::from("Available tools:\n\n"); + for tool in &all_tools { + tool_list.push_str(&format!(" - {}: {}\n", tool.function.name, tool.function.description)); + } + bot.send_message(msg.chat.id, tool_list).await?; + return Ok(()); + } + + // Send "typing" indicator + bot.send_chat_action(msg.chat.id, teloxide::types::ChatAction::Typing) + .await + .ok(); + + // Build platform-agnostic message + let incoming = IncomingMessage { + platform: "telegram".to_string(), + user_id: user_id.to_string(), + chat_id: msg.chat.id.0.to_string(), + user_name, + text, + }; + + // Process through agent + match agent.process_message(&incoming).await { + Ok(response) => { + for chunk in split_message(&response, 4000) { + bot.send_message(msg.chat.id, chunk).await.ok(); + } + } + Err(e) => { + error!("Error processing message: {:#}", e); + bot.send_message(msg.chat.id, format!("Error: {}", e)).await?; + } + } + + Ok(()) +} +``` + +**Step 2: Update `src/main.rs` to use Agent and Telegram platform** + +Replace the AppState creation and bot::run with: + +```rust +use crate::agent::Agent; + +// Create the agent +let agent = Arc::new(Agent::new(config.clone(), mcp_manager, memory, skills)); + +// Run Telegram platform +info!("Bot is starting..."); +platform::telegram::run( + agent, + config.telegram.allowed_user_ids.clone(), + &config.telegram.bot_token, +).await?; +``` + +**Step 3: Deprecate `src/bot.rs`** + +Remove `mod bot;` from `src/main.rs`. The file `src/bot.rs` can be deleted or kept for reference. All its logic now lives in `src/agent.rs` and `src/platform/telegram.rs`. + +**Step 4: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully + +**Step 5: Commit** + +```bash +git add src/platform/telegram.rs src/main.rs +git rm src/bot.rs # or just remove mod bot; and delete later +git commit -m "feat: refactor Telegram as platform adapter using Agent core" +``` + +--- + +## Phase 5: Background Task Scheduler + +### Task 11: Add Scheduler Dependencies + +**Files:** +- Modify: `Cargo.toml` + +**Step 1: Add tokio-cron-scheduler dependency** + +```toml +# Background task scheduler +tokio-cron-scheduler = "0.13" +``` + +**Step 2: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully + +**Step 3: Commit** + +```bash +git add Cargo.toml Cargo.lock +git commit -m "feat: add tokio-cron-scheduler dependency" +``` + +--- + +### Task 12: Create Scheduler Module + +**Files:** +- Create: `src/scheduler/mod.rs` +- Create: `src/scheduler/tasks.rs` +- Modify: `src/main.rs` (add `mod scheduler;`) + +**Step 1: Create `src/scheduler/mod.rs`** + +```rust +pub mod tasks; + +use anyhow::{Context, Result}; +use tokio_cron_scheduler::{Job, JobScheduler}; +use tracing::{error, info}; + +/// Wrapper around tokio-cron-scheduler +pub struct Scheduler { + inner: JobScheduler, +} + +impl Scheduler { + /// Create a new scheduler + pub async fn new() -> Result { + let inner = JobScheduler::new() + .await + .context("Failed to create job scheduler")?; + Ok(Self { inner }) + } + + /// Add a cron job + pub async fn add_cron_job(&self, cron_expr: &str, name: &str, task: F) -> Result<()> + where + F: Fn() -> std::pin::Pin + Send>> + Send + Sync + 'static, + { + let job_name = name.to_string(); + let job = Job::new_async(cron_expr, move |_uuid, _lock| { + let name = job_name.clone(); + let fut = task(); + Box::pin(async move { + info!("Running scheduled task: {}", name); + fut.await; + }) + }) + .with_context(|| format!("Failed to create cron job: {}", name))?; + + self.inner + .add(job) + .await + .with_context(|| format!("Failed to add job: {}", name))?; + + info!("Scheduled task '{}' with cron: {}", name, cron_expr); + Ok(()) + } + + /// Add a one-shot delayed job + pub async fn add_delayed_job( + &self, + delay: std::time::Duration, + name: &str, + task: F, + ) -> Result<()> + where + F: FnOnce() -> std::pin::Pin + Send>> + Send + Sync + 'static, + { + let job_name = name.to_string(); + let job = Job::new_one_shot_async(delay, move |_uuid, _lock| { + let name = job_name.clone(); + let fut = task(); + Box::pin(async move { + info!("Running one-shot task: {}", name); + fut.await; + }) + }) + .with_context(|| format!("Failed to create one-shot job: {}", name))?; + + self.inner + .add(job) + .await + .with_context(|| format!("Failed to add one-shot job: {}", name))?; + + info!("Scheduled one-shot task '{}' in {:?}", name, delay); + Ok(()) + } + + /// Start the scheduler + pub async fn start(&self) -> Result<()> { + self.inner + .start() + .await + .context("Failed to start scheduler")?; + info!("Scheduler started"); + Ok(()) + } + + /// Shutdown the scheduler + pub async fn shutdown(&mut self) -> Result<()> { + self.inner + .shutdown() + .await + .context("Failed to shutdown scheduler")?; + info!("Scheduler stopped"); + Ok(()) + } +} +``` + +**Step 2: Create `src/scheduler/tasks.rs`** + +```rust +use std::sync::Arc; + +use tracing::info; + +use crate::memory::MemoryStore; +use crate::scheduler::Scheduler; + +/// Register built-in background tasks +pub async fn register_builtin_tasks( + scheduler: &Scheduler, + memory: MemoryStore, +) -> anyhow::Result<()> { + // Heartbeat — log that the bot is alive every hour + scheduler + .add_cron_job("0 0 * * * *", "heartbeat", || { + Box::pin(async { + info!("Heartbeat: bot is alive"); + }) + }) + .await?; + + // Example: periodic memory cleanup could be added here + // scheduler.add_cron_job("0 0 3 * * *", "memory-cleanup", move || { ... }).await?; + + Ok(()) +} +``` + +**Step 3: Register module in `src/main.rs`** + +Add `mod scheduler;`. + +**Step 4: Wire scheduler in `src/main.rs`** + +After agent creation, before platform start: + +```rust +use crate::scheduler::Scheduler; +use crate::scheduler::tasks::register_builtin_tasks; + +// Initialize scheduler +let scheduler = Scheduler::new().await?; +register_builtin_tasks(&scheduler, memory.clone()).await?; +scheduler.start().await?; +info!(" Scheduler: active"); +``` + +**Step 5: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully + +**Step 6: Commit** + +```bash +git add src/scheduler/ src/main.rs +git commit -m "feat: add background task scheduler with tokio-cron-scheduler" +``` + +--- + +## Phase 6: Feature Flags + +### Task 13: Add Cargo Feature Flags + +**Files:** +- Modify: `Cargo.toml` + +**Step 1: Add feature flags to `Cargo.toml`** + +```toml +[features] +default = ["telegram", "memory", "skills", "scheduler"] +telegram = ["dep:teloxide"] +# discord = ["dep:serenity", "dep:poise"] # Future +memory = ["dep:rusqlite", "dep:uuid"] +skills = [] +scheduler = ["dep:tokio-cron-scheduler"] +``` + +Update dependencies to be optional where appropriate: + +```toml +teloxide = { version = "0.17", features = ["macros"], optional = true } +rusqlite = { version = "0.34", features = ["bundled", "modern_full"], optional = true } +uuid = { version = "1", features = ["v4"], optional = true } +tokio-cron-scheduler = { version = "0.13", optional = true } +``` + +**Step 2: Add `#[cfg(feature = "...")]` guards to modules in `src/main.rs`** + +```rust +#[cfg(feature = "memory")] +mod memory; +#[cfg(feature = "telegram")] +mod platform; +#[cfg(feature = "skills")] +mod skills; +#[cfg(feature = "scheduler")] +mod scheduler; +``` + +**Step 3: Verify it compiles with all features** + +Run: `cargo check --all-features` +Expected: Compiles successfully + +**Step 4: Commit** + +```bash +git add Cargo.toml src/main.rs +git commit -m "feat: add cargo feature flags for modular compilation" +``` + +--- + +## Phase 7: Create Example Agent Skills + +### Task 14: Create Bot Agent Skills Directory with Example Skills + +**Files:** +- Create: `skills/coding-assistant.md` +- Create: `skills/memory-manager.md` + +**Step 1: Create `skills/coding-assistant.md`** + +```markdown +--- +name: coding-assistant +description: Help users write, review, and debug code +tags: [coding, development] +--- + +# Coding Assistant + +When the user asks for help with code: + +1. **Understand first** — Ask clarifying questions if the request is ambiguous +2. **Read before writing** — Use read_file to understand existing code before modifying +3. **Small changes** — Make focused, minimal changes. Don't refactor unrelated code +4. **Explain your reasoning** — Briefly explain what you changed and why +5. **Test awareness** — Suggest how to test the changes if applicable + +When reviewing code: +- Point out bugs, security issues, and performance problems +- Suggest improvements but don't over-engineer +- Be specific — reference line numbers and provide fixed code + +When debugging: +- Ask for error messages and reproduction steps +- Use execute_command to investigate (check logs, run tests) +- Explain the root cause, not just the fix +``` + +**Step 2: Create `skills/memory-manager.md`** + +```markdown +--- +name: memory-manager +description: Proactively remember and recall useful information +tags: [memory, learning] +--- + +# Memory Manager + +You have persistent memory tools: `remember`, `recall`, and `search_memory`. + +## When to Remember + +Proactively use the `remember` tool when: +- The user tells you their name, preferences, or important context +- You learn something about their project or workflow +- The user corrects you — remember the correction +- You discover useful facts during tool use + +Categories to use: +- `user_preference` — User's stated preferences (language, style, etc.) +- `user_info` — Name, role, timezone, etc. +- `project` — Project-specific knowledge (architecture, conventions) +- `correction` — Things the user corrected you about +- `fact` — General facts learned during conversation + +## When to Recall + +- At the start of conversations, search memory for relevant user context +- Before making assumptions, check if you've remembered something relevant +- When the user references something from a past conversation +``` + +**Step 3: Commit** + +```bash +git add skills/ +git commit -m "feat: add example agent skills for coding assistant and memory manager" +``` + +--- + +## Phase 8: Final Integration and Cleanup + +### Task 15: Update main.rs with Full Initialization Flow + +**Files:** +- Modify: `src/main.rs` (complete rewrite of main function) + +**Step 1: Write the complete `src/main.rs`** + +```rust +mod agent; +mod config; +mod llm; +mod mcp; +mod memory; +mod platform; +mod scheduler; +mod skills; +mod tools; + +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use tracing::info; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +use crate::agent::Agent; +use crate::config::Config; +use crate::mcp::McpManager; +use crate::memory::MemoryStore; +use crate::scheduler::Scheduler; +use crate::scheduler::tasks::register_builtin_tasks; +use crate::skills::loader::load_skills_from_dir; + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logging + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "info,rustbot=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + // Load configuration + let config_path = std::env::args() + .nth(1) + .map(PathBuf::from) + .unwrap_or_else(|| PathBuf::from("config.toml")); + + info!("Loading configuration from: {}", config_path.display()); + let config = Config::load(&config_path) + .with_context(|| format!("Failed to load config from {}", config_path.display()))?; + + info!("Configuration loaded successfully"); + info!(" Model: {}", config.openrouter.model); + info!(" Sandbox: {}", config.sandbox.allowed_directory.display()); + info!(" Allowed users: {:?}", config.telegram.allowed_user_ids); + info!(" MCP servers: {}", config.mcp_servers.len()); + + // Initialize memory store + let memory = MemoryStore::open(&config.memory.database_path) + .context("Failed to initialize memory store")?; + info!(" Database: {}", config.memory.database_path.display()); + + // Initialize MCP connections + let mut mcp_manager = McpManager::new(); + mcp_manager.connect_all(&config.mcp_servers).await; + + // Load skills + let skills = load_skills_from_dir(&config.skills.directory).await?; + info!(" Skills: {}", skills.len()); + + // Create the agent + let agent = Arc::new(Agent::new(config.clone(), mcp_manager, memory.clone(), skills)); + + // Initialize scheduler + let scheduler = Scheduler::new().await?; + register_builtin_tasks(&scheduler, memory).await?; + scheduler.start().await?; + info!(" Scheduler: active"); + + // Run the Telegram platform + info!("Bot is starting..."); + platform::telegram::run( + agent, + config.telegram.allowed_user_ids.clone(), + &config.telegram.bot_token, + ) + .await?; + + Ok(()) +} +``` + +**Step 2: Delete `src/bot.rs`** + +```bash +rm src/bot.rs +``` + +**Step 3: Verify it compiles** + +Run: `cargo check` +Expected: Compiles successfully + +**Step 4: Commit** + +```bash +git add src/main.rs +git rm src/bot.rs +git commit -m "feat: complete AI agent framework integration — replace bot.rs with modular architecture" +``` + +--- + +### Task 16: Verify Full Build and Test + +**Step 1: Full build** + +Run: `cargo build` +Expected: Builds successfully with no errors + +**Step 2: Check for warnings** + +Run: `cargo build 2>&1 | grep -i warning` +Expected: Minimal/no warnings + +**Step 3: Run clippy** + +Run: `cargo clippy -- -W clippy::all` +Expected: No errors (warnings OK for now) + +**Step 4: Final commit if any fixes needed** + +```bash +git add -A +git commit -m "fix: address clippy warnings and build issues" +``` + +**Step 5: Push to branch** + +```bash +git push -u origin claude/ai-agent-framework-BEBZn +``` + +--- + +## Summary of New Module Responsibilities + +| Module | Purpose | +|--------|---------| +| `agent.rs` | Core agentic loop — platform-agnostic message processing with LLM + tools | +| `memory/` | SQLite-backed persistent conversations + knowledge base with FTS5 | +| `platform/` | Platform adapters (Telegram now, Discord-ready) via `Platform` trait | +| `skills/` | Load markdown skill files → inject into system prompt | +| `scheduler/` | Background tasks via tokio-cron-scheduler (heartbeat, reminders) | + +## Architecture Diagram + +``` +┌─────────────────────────────────────────────────────┐ +│ main.rs │ +│ Config → Memory → MCP → Skills → Agent → Platform │ +└─────────────────────┬───────────────────────────────┘ + │ + ┌───────────┼───────────┐ + ▼ ▼ ▼ + ┌──────────┐ ┌─────────┐ ┌──────────┐ + │ Telegram │ │ Agent │ │Scheduler │ + │ Platform │ │ Core │ │ Tasks │ + └────┬─────┘ └────┬────┘ └──────────┘ + │ │ + │ ┌──────┼──────┐ + │ ▼ ▼ ▼ + │ ┌─────┐ ┌────┐ ┌──────┐ + │ │ LLM │ │MCP │ │Tools │ + │ └─────┘ └────┘ └──────┘ + │ │ + │ ┌──────┼──────┐ + │ ▼ ▼ ▼ + │ ┌──────┐ ┌──────────┐ ┌────────┐ + └─▶│Memory│ │Knowledge │ │Skills │ + │(SQLite)│(FTS5) │ │(.md) │ + └──────┘ └──────────┘ └────────┘ +``` + +## Dependencies Added + +| Crate | Purpose | +|-------|---------| +| `rusqlite` (bundled, modern_full) | SQLite database with FTS5 | +| `chrono` (serde) | Timestamps | +| `uuid` (v4) | Message/entry IDs | +| `async-trait` | Async trait support for Platform | +| `tokio-cron-scheduler` | Background task scheduling | diff --git a/skills/coding-assistant.md b/skills/coding-assistant.md new file mode 100644 index 0000000..761c116 --- /dev/null +++ b/skills/coding-assistant.md @@ -0,0 +1,25 @@ +--- +name: coding-assistant +description: Help users write, review, and debug code +tags: [coding, development] +--- + +# Coding Assistant + +When the user asks for help with code: + +1. **Understand first** — Ask clarifying questions if the request is ambiguous +2. **Read before writing** — Use read_file to understand existing code before modifying +3. **Small changes** — Make focused, minimal changes. Don't refactor unrelated code +4. **Explain your reasoning** — Briefly explain what you changed and why +5. **Test awareness** — Suggest how to test the changes if applicable + +When reviewing code: +- Point out bugs, security issues, and performance problems +- Suggest improvements but don't over-engineer +- Be specific — reference line numbers and provide fixed code + +When debugging: +- Ask for error messages and reproduction steps +- Use execute_command to investigate (check logs, run tests) +- Explain the root cause, not just the fix diff --git a/skills/memory-manager.md b/skills/memory-manager.md new file mode 100644 index 0000000..8119209 --- /dev/null +++ b/skills/memory-manager.md @@ -0,0 +1,30 @@ +--- +name: memory-manager +description: Proactively remember and recall useful information +tags: [memory, learning] +--- + +# Memory Manager + +You have persistent memory tools: `remember`, `recall`, and `search_memory`. + +## When to Remember + +Proactively use the `remember` tool when: +- The user tells you their name, preferences, or important context +- You learn something about their project or workflow +- The user corrects you — remember the correction +- You discover useful facts during tool use + +Categories to use: +- `user_preference` — User's stated preferences (language, style, etc.) +- `user_info` — Name, role, timezone, etc. +- `project` — Project-specific knowledge (architecture, conventions) +- `correction` — Things the user corrected you about +- `fact` — General facts learned during conversation + +## When to Recall + +- At the start of conversations, search memory for relevant user context +- Before making assumptions, check if you've remembered something relevant +- When the user references something from a past conversation diff --git a/src/agent.rs b/src/agent.rs new file mode 100644 index 0000000..08f0330 --- /dev/null +++ b/src/agent.rs @@ -0,0 +1,297 @@ +use anyhow::Result; +use tracing::info; + +use crate::config::Config; +use crate::llm::{ChatMessage, FunctionDefinition, LlmClient, ToolDefinition}; +use crate::mcp::McpManager; +use crate::memory::MemoryStore; +use crate::platform::IncomingMessage; +use crate::skills::SkillRegistry; +use crate::tools; + +/// The core agent that processes messages through LLM + tools. +/// Platform-agnostic — receives IncomingMessage, returns response text. +pub struct Agent { + pub llm: LlmClient, + pub config: Config, + pub mcp: McpManager, + pub memory: MemoryStore, + pub skills: SkillRegistry, +} + +impl Agent { + pub fn new( + config: Config, + mcp: McpManager, + memory: MemoryStore, + skills: SkillRegistry, + ) -> Self { + let llm = LlmClient::new(config.openrouter.clone()); + Self { + llm, + config, + mcp, + memory, + skills, + } + } + + /// Build the system prompt, incorporating loaded skills + fn build_system_prompt(&self) -> String { + let mut prompt = self.config.openrouter.system_prompt.clone(); + + let skill_context = self.skills.build_context(); + if !skill_context.is_empty() { + prompt.push_str("\n\n# Available Skills\n\n"); + prompt.push_str(&skill_context); + } + + prompt + } + + /// Process an incoming message and return the response text + pub async fn process_message(&self, incoming: &IncomingMessage) -> Result { + let platform = &incoming.platform; + let user_id = &incoming.user_id; + + // Get or create persistent conversation + let conversation_id = self + .memory + .get_or_create_conversation(platform, user_id) + .await?; + + // Load existing messages from memory + let mut messages = self.memory.load_messages(&conversation_id).await?; + + // If no messages yet, add system prompt + if messages.is_empty() { + let system_msg = ChatMessage { + role: "system".to_string(), + content: Some(self.build_system_prompt()), + tool_calls: None, + tool_call_id: None, + }; + self.memory + .save_message(&conversation_id, &system_msg) + .await?; + messages.push(system_msg); + } + + // Add user message + let user_msg = ChatMessage { + role: "user".to_string(), + content: Some(incoming.text.clone()), + tool_calls: None, + tool_call_id: None, + }; + self.memory + .save_message(&conversation_id, &user_msg) + .await?; + messages.push(user_msg); + + // Gather all tool definitions + let mut all_tools: Vec = tools::builtin_tool_definitions(); + all_tools.extend(self.mcp.tool_definitions()); + all_tools.extend(self.memory_tool_definitions()); + + // Agentic loop — keep calling LLM until we get a non-tool response + let max_iterations = 10; + for iteration in 0..max_iterations { + let response = self.llm.chat(&messages, &all_tools).await?; + + if let Some(tool_calls) = &response.tool_calls { + if !tool_calls.is_empty() { + info!( + "LLM requested {} tool call(s) (iteration {})", + tool_calls.len(), + iteration + ); + + // Save assistant message with tool calls + self.memory + .save_message(&conversation_id, &response) + .await?; + messages.push(response.clone()); + + // Execute each tool call + for tool_call in tool_calls { + let arguments: serde_json::Value = + serde_json::from_str(&tool_call.function.arguments) + .unwrap_or(serde_json::Value::Object(serde_json::Map::new())); + + let tool_result = self + .execute_tool(&tool_call.function.name, &arguments) + .await; + + info!( + "Tool '{}' result length: {} chars", + tool_call.function.name, + tool_result.len() + ); + + let tool_msg = ChatMessage { + role: "tool".to_string(), + content: Some(tool_result), + tool_calls: None, + tool_call_id: Some(tool_call.id.clone()), + }; + self.memory + .save_message(&conversation_id, &tool_msg) + .await?; + messages.push(tool_msg); + } + + continue; + } + } + + // Final response — no tool calls + let content = response.content.clone().unwrap_or_default(); + self.memory + .save_message(&conversation_id, &response) + .await?; + + return Ok(content); + } + + Ok("I've reached the maximum number of tool call iterations. Please try rephrasing your request.".to_string()) + } + + /// Clear conversation history for a user + pub async fn clear_conversation(&self, platform: &str, user_id: &str) -> Result<()> { + self.memory.clear_conversation(platform, user_id).await + } + + /// Get all tool definitions for display + pub fn all_tool_definitions(&self) -> Vec { + let mut all_tools = tools::builtin_tool_definitions(); + all_tools.extend(self.mcp.tool_definitions()); + all_tools.extend(self.memory_tool_definitions()); + all_tools + } + + /// Memory-related tool definitions exposed to the LLM + fn memory_tool_definitions(&self) -> Vec { + use serde_json::json; + + vec![ + ToolDefinition { + tool_type: "function".to_string(), + function: FunctionDefinition { + name: "remember".to_string(), + description: "Store a piece of knowledge for long-term memory. Use this to remember user preferences, facts, or anything useful.".to_string(), + parameters: json!({ + "type": "object", + "properties": { + "category": { "type": "string", "description": "Category (e.g., 'user_preference', 'fact', 'project')" }, + "key": { "type": "string", "description": "Short identifier for this knowledge" }, + "value": { "type": "string", "description": "The knowledge to remember" } + }, + "required": ["category", "key", "value"] + }), + }, + }, + ToolDefinition { + tool_type: "function".to_string(), + function: FunctionDefinition { + name: "recall".to_string(), + description: "Retrieve a specific piece of remembered knowledge.".to_string(), + parameters: json!({ + "type": "object", + "properties": { + "category": { "type": "string", "description": "Category to search in" }, + "key": { "type": "string", "description": "The key to look up" } + }, + "required": ["category", "key"] + }), + }, + }, + ToolDefinition { + tool_type: "function".to_string(), + function: FunctionDefinition { + name: "search_memory".to_string(), + description: "Search through past conversations and knowledge using hybrid vector + full-text search. Finds semantically similar content even with different wording.".to_string(), + parameters: json!({ + "type": "object", + "properties": { + "query": { "type": "string", "description": "Search query (natural language)" }, + "limit": { "type": "integer", "description": "Max results (default 5)" } + }, + "required": ["query"] + }), + }, + }, + ] + } + + /// Execute a tool call by routing to the right handler + async fn execute_tool(&self, name: &str, arguments: &serde_json::Value) -> String { + match name { + "remember" => { + let category = arguments["category"].as_str().unwrap_or("general"); + let key = arguments["key"].as_str().unwrap_or(""); + let value = arguments["value"].as_str().unwrap_or(""); + match self.memory.remember(category, key, value, None).await { + Ok(()) => format!("Remembered: [{}] {} = {}", category, key, value), + Err(e) => format!("Failed to remember: {}", e), + } + } + "recall" => { + let category = arguments["category"].as_str().unwrap_or("general"); + let key = arguments["key"].as_str().unwrap_or(""); + match self.memory.recall(category, key).await { + Ok(Some(value)) => value, + Ok(None) => format!("No knowledge found for [{}] {}", category, key), + Err(e) => format!("Failed to recall: {}", e), + } + } + "search_memory" => { + let query = arguments["query"].as_str().unwrap_or(""); + let limit = arguments["limit"].as_u64().unwrap_or(5) as usize; + + let mut results = Vec::new(); + + // Search conversations (hybrid vector + FTS5) + if let Ok(msgs) = self.memory.search_messages(query, limit).await { + for msg in msgs { + if let Some(content) = &msg.content { + results.push(format!("[{}]: {}", msg.role, content)); + } + } + } + + // Search knowledge (hybrid vector + FTS5) + if let Ok(entries) = self.memory.search_knowledge(query, limit).await { + for entry in entries { + results.push(format!( + "[knowledge:{}] {} = {}", + entry.category, entry.key, entry.value + )); + } + } + + if results.is_empty() { + "No results found.".to_string() + } else { + results.join("\n\n") + } + } + _ if self.mcp.is_mcp_tool(name) => match self.mcp.call_tool(name, arguments).await { + Ok(result) => result, + Err(e) => format!("MCP tool error: {}", e), + }, + _ => { + match tools::execute_builtin_tool( + name, + arguments, + &self.config.sandbox.allowed_directory, + ) + .await + { + Ok(result) => result, + Err(e) => format!("Tool error: {}", e), + } + } + } + } +} diff --git a/src/bot.rs b/src/bot.rs deleted file mode 100644 index 1da1ce2..0000000 --- a/src/bot.rs +++ /dev/null @@ -1,311 +0,0 @@ -use std::collections::HashMap; -use std::sync::Arc; - -use anyhow::Result; -use teloxide::prelude::*; -use tokio::sync::Mutex; -use tracing::{error, info, warn}; - -use crate::config::Config; -use crate::llm::{ChatMessage, LlmClient, ToolDefinition}; -use crate::mcp::McpManager; -use crate::tools; - -/// Per-user conversation state -struct Conversation { - messages: Vec, -} - -impl Conversation { - fn new(system_prompt: &str) -> Self { - Self { - messages: vec![ChatMessage { - role: "system".to_string(), - content: Some(system_prompt.to_string()), - tool_calls: None, - tool_call_id: None, - }], - } - } -} - -/// Shared application state -pub struct AppState { - llm: LlmClient, - config: Config, - mcp: McpManager, - conversations: Mutex>, -} - -impl AppState { - pub fn new(config: Config, mcp: McpManager) -> Self { - let llm = LlmClient::new(config.openrouter.clone()); - Self { - llm, - config, - mcp, - conversations: Mutex::new(HashMap::new()), - } - } -} - -/// Start the Telegram bot -pub async fn run(state: Arc) -> Result<()> { - let bot = Bot::new(&state.config.telegram.bot_token); - - info!("Starting Telegram bot..."); - - let allowed_users = state.config.telegram.allowed_user_ids.clone(); - - let handler = Update::filter_message() - .filter_map(move |msg: Message| { - let user = msg.from.as_ref()?; - if allowed_users.contains(&user.id.0) { - Some(msg) - } else { - None - } - }) - .endpoint(handle_message); - - Dispatcher::builder(bot, handler) - .dependencies(dptree::deps![state]) - .default_handler(|upd| async move { - warn!("Unhandled update: {:?}", upd.id); - }) - .error_handler(LoggingErrorHandler::with_custom_text("bot")) - .build() - .dispatch() - .await; - - Ok(()) -} - -async fn handle_message(bot: Bot, msg: Message, state: Arc) -> ResponseResult<()> { - let user_id = match msg.from.as_ref() { - Some(user) => user.id.0, - None => return Ok(()), - }; - - let text = match msg.text() { - Some(t) => t.to_string(), - None => return Ok(()), - }; - - info!("Message from user {}: {}", user_id, text); - - // Handle /clear command to reset conversation - if text == "/clear" { - let mut conversations = state.conversations.lock().await; - conversations.remove(&user_id); - bot.send_message(msg.chat.id, "Conversation cleared.") - .await?; - return Ok(()); - } - - // Handle /start command - if text == "/start" { - bot.send_message( - msg.chat.id, - "Hello! I'm your AI assistant. Send me a message and I'll help you.\n\n\ - Commands:\n\ - /clear - Clear conversation history\n\ - /tools - List available tools", - ) - .await?; - return Ok(()); - } - - // Handle /tools command - if text == "/tools" { - let builtin = tools::builtin_tool_definitions(); - let mcp_tools = state.mcp.tool_definitions(); - - let mut tool_list = String::from("Available tools:\n\n"); - tool_list.push_str("Built-in tools:\n"); - for tool in &builtin { - tool_list.push_str(&format!( - " - {}: {}\n", - tool.function.name, tool.function.description - )); - } - - if !mcp_tools.is_empty() { - tool_list.push_str("\nMCP tools:\n"); - for tool in &mcp_tools { - tool_list.push_str(&format!( - " - {}: {}\n", - tool.function.name, tool.function.description - )); - } - } - - bot.send_message(msg.chat.id, tool_list).await?; - return Ok(()); - } - - // Send "typing" indicator - bot.send_chat_action(msg.chat.id, teloxide::types::ChatAction::Typing) - .await - .ok(); - - // Process the message with the LLM - match process_with_llm(&state, user_id, &text).await { - Ok(response) => { - // Split long messages (Telegram has a 4096 char limit) - for chunk in split_message(&response, 4000) { - // Try sending, ignore errors for individual chunks - bot.send_message(msg.chat.id, chunk).await.ok(); - } - } - Err(e) => { - error!("Error processing message: {:#}", e); - bot.send_message(msg.chat.id, format!("Error: {}", e)) - .await?; - } - } - - Ok(()) -} - -async fn process_with_llm(state: &AppState, user_id: u64, text: &str) -> Result { - // Get or create conversation - { - let mut conversations = state.conversations.lock().await; - conversations - .entry(user_id) - .or_insert_with(|| Conversation::new(&state.config.openrouter.system_prompt)); - - // Add user message - let conv = conversations.get_mut(&user_id).unwrap(); - conv.messages.push(ChatMessage { - role: "user".to_string(), - content: Some(text.to_string()), - tool_calls: None, - tool_call_id: None, - }); - } - - // Gather all tool definitions - let mut all_tools: Vec = tools::builtin_tool_definitions(); - all_tools.extend(state.mcp.tool_definitions()); - - // Agentic loop - keep calling LLM until we get a non-tool response - let max_iterations = 10; - for iteration in 0..max_iterations { - let messages = { - let conversations = state.conversations.lock().await; - conversations[&user_id].messages.clone() - }; - - let response = state.llm.chat(&messages, &all_tools).await?; - - // Check if the LLM wants to call tools - if let Some(tool_calls) = &response.tool_calls { - if !tool_calls.is_empty() { - info!( - "LLM requested {} tool call(s) (iteration {})", - tool_calls.len(), - iteration - ); - - // Add assistant message with tool calls to conversation - { - let mut conversations = state.conversations.lock().await; - let conv = conversations.get_mut(&user_id).unwrap(); - conv.messages.push(response.clone()); - } - - // Execute each tool call - for tool_call in tool_calls { - let arguments: serde_json::Value = - serde_json::from_str(&tool_call.function.arguments) - .unwrap_or(serde_json::Value::Object(serde_json::Map::new())); - - let tool_result = - execute_tool(state, &tool_call.function.name, &arguments).await; - - info!( - "Tool '{}' result length: {} chars", - tool_call.function.name, - tool_result.len() - ); - - // Add tool result to conversation - { - let mut conversations = state.conversations.lock().await; - let conv = conversations.get_mut(&user_id).unwrap(); - conv.messages.push(ChatMessage { - role: "tool".to_string(), - content: Some(tool_result), - tool_calls: None, - tool_call_id: Some(tool_call.id.clone()), - }); - } - } - - // Continue the loop to let the LLM process tool results - continue; - } - } - - // No tool calls - we have a final response - let content = response.content.clone().unwrap_or_default(); - - // Add assistant response to conversation - { - let mut conversations = state.conversations.lock().await; - let conv = conversations.get_mut(&user_id).unwrap(); - conv.messages.push(response); - } - - return Ok(content); - } - - Ok("I've reached the maximum number of tool call iterations. Please try rephrasing your request.".to_string()) -} - -async fn execute_tool(state: &AppState, name: &str, arguments: &serde_json::Value) -> String { - if state.mcp.is_mcp_tool(name) { - match state.mcp.call_tool(name, arguments).await { - Ok(result) => result, - Err(e) => format!("MCP tool error: {}", e), - } - } else { - match tools::execute_builtin_tool(name, arguments, &state.config.sandbox.allowed_directory) - .await - { - Ok(result) => result, - Err(e) => format!("Tool error: {}", e), - } - } -} - -fn split_message(text: &str, max_len: usize) -> Vec { - if text.len() <= max_len { - return vec![text.to_string()]; - } - - let mut chunks = Vec::new(); - let mut start = 0; - - while start < text.len() { - let end = (start + max_len).min(text.len()); - - // Try to split at a newline or space - let actual_end = if end < text.len() { - text[start..end] - .rfind('\n') - .or_else(|| text[start..end].rfind(' ')) - .map(|pos| start + pos + 1) - .unwrap_or(end) - } else { - end - }; - - chunks.push(text[start..actual_end].to_string()); - start = actual_end; - } - - chunks -} diff --git a/src/config.rs b/src/config.rs index e3b8e78..ce57827 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,6 +9,22 @@ pub struct Config { pub sandbox: SandboxConfig, #[serde(default)] pub mcp_servers: Vec, + #[serde(default = "default_memory_config")] + pub memory: MemoryConfig, + #[serde(default = "default_skills_config")] + pub skills: SkillsConfig, + pub embedding: Option, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct EmbeddingApiConfig { + pub api_key: String, + #[serde(default = "default_embedding_base_url")] + pub base_url: String, + #[serde(default = "default_embedding_model")] + pub model: String, + #[serde(default = "default_embedding_dimensions")] + pub dimensions: usize, } #[derive(Debug, Deserialize, Clone)] @@ -45,6 +61,18 @@ pub struct McpServerConfig { pub env: std::collections::HashMap, } +#[derive(Debug, Deserialize, Clone)] +pub struct MemoryConfig { + #[serde(default = "default_db_path")] + pub database_path: PathBuf, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct SkillsConfig { + #[serde(default = "default_skills_dir")] + pub directory: PathBuf, +} + fn default_model() -> String { "qwen/qwen3-235b-a22b".to_string() } @@ -64,6 +92,38 @@ fn default_system_prompt() -> String { .to_string() } +fn default_db_path() -> PathBuf { + PathBuf::from("rustbot.db") +} + +fn default_skills_dir() -> PathBuf { + PathBuf::from("skills") +} + +fn default_embedding_base_url() -> String { + "https://openrouter.ai/api/v1".to_string() +} + +fn default_embedding_model() -> String { + "openai/text-embedding-3-small".to_string() +} + +fn default_embedding_dimensions() -> usize { + 1536 +} + +fn default_memory_config() -> MemoryConfig { + MemoryConfig { + database_path: default_db_path(), + } +} + +fn default_skills_config() -> SkillsConfig { + SkillsConfig { + directory: default_skills_dir(), + } +} + impl Config { pub fn load(path: &Path) -> Result { let content = std::fs::read_to_string(path) diff --git a/src/main.rs b/src/main.rs index e512565..f53b00c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,11 @@ -mod bot; +mod agent; mod config; mod llm; mod mcp; +mod memory; +mod platform; +mod scheduler; +mod skills; mod tools; use std::path::PathBuf; @@ -11,9 +15,13 @@ use anyhow::{Context, Result}; use tracing::info; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use crate::bot::AppState; +use crate::agent::Agent; use crate::config::Config; use crate::mcp::McpManager; +use crate::memory::MemoryStore; +use crate::scheduler::tasks::register_builtin_tasks; +use crate::scheduler::Scheduler; +use crate::skills::loader::load_skills_from_dir; #[tokio::main] async fn main() -> Result<()> { @@ -38,26 +46,57 @@ async fn main() -> Result<()> { info!("Configuration loaded successfully"); info!(" Model: {}", config.openrouter.model); - info!( - " Sandbox: {}", - config.sandbox.allowed_directory.display() - ); - info!( - " Allowed users: {:?}", - config.telegram.allowed_user_ids - ); + info!(" Sandbox: {}", config.sandbox.allowed_directory.display()); + info!(" Allowed users: {:?}", config.telegram.allowed_user_ids); info!(" MCP servers: {}", config.mcp_servers.len()); + // Build embedding config if configured + let embedding_config = + config + .embedding + .as_ref() + .map(|cfg| crate::memory::embeddings::EmbeddingConfig { + api_key: cfg.api_key.clone(), + base_url: cfg.base_url.clone(), + model: cfg.model.clone(), + dimensions: cfg.dimensions, + }); + + // Initialize memory store (SQLite + vector embeddings) + let memory = MemoryStore::open(&config.memory.database_path, embedding_config) + .context("Failed to initialize memory store")?; + info!(" Database: {}", config.memory.database_path.display()); + // Initialize MCP connections let mut mcp_manager = McpManager::new(); mcp_manager.connect_all(&config.mcp_servers).await; - // Create shared state - let state = Arc::new(AppState::new(config, mcp_manager)); + // Load skills from markdown files + let skills = load_skills_from_dir(&config.skills.directory).await?; + info!(" Skills: {}", skills.len()); + + // Create the agent + let agent = Arc::new(Agent::new( + config.clone(), + mcp_manager, + memory.clone(), + skills, + )); + + // Initialize background task scheduler + let scheduler = Scheduler::new().await?; + register_builtin_tasks(&scheduler, memory).await?; + scheduler.start().await?; + info!(" Scheduler: active"); - // Run the Telegram bot + // Run the Telegram platform info!("Bot is starting..."); - bot::run(state).await?; + platform::telegram::run( + agent, + config.telegram.allowed_user_ids.clone(), + &config.telegram.bot_token, + ) + .await?; Ok(()) } diff --git a/src/mcp.rs b/src/mcp.rs index 89257e9..ca20eef 100644 --- a/src/mcp.rs +++ b/src/mcp.rs @@ -53,12 +53,16 @@ impl McpManager { })) .with_context(|| format!("Failed to start MCP server process: {}", config.name))?; - let client = ().serve(transport).await.with_context(|| { - format!("Failed to initialize MCP connection: {}", config.name) - })?; + let client = () + .serve(transport) + .await + .with_context(|| format!("Failed to initialize MCP connection: {}", config.name))?; let server_info = client.peer_info(); - info!("Connected to MCP server '{}': {:?}", config.name, server_info); + info!( + "Connected to MCP server '{}': {:?}", + config.name, server_info + ); let tools = client .list_all_tools() @@ -132,7 +136,11 @@ impl McpManager { let prefix = format!("{}_", connection.name); if let Some(tool_name) = without_mcp.strip_prefix(&prefix) { // Verify this tool exists on this server - if connection.tools.iter().any(|t| t.name.as_ref() == tool_name) { + if connection + .tools + .iter() + .any(|t| t.name.as_ref() == tool_name) + { info!( "Calling MCP tool '{}' on server '{}'", tool_name, connection.name @@ -180,6 +188,7 @@ impl McpManager { } /// Shutdown all MCP connections + #[allow(dead_code)] pub async fn shutdown(&mut self) { for (name, connection) in self.connections.drain() { info!("Shutting down MCP server: {}", name); diff --git a/src/memory/conversations.rs b/src/memory/conversations.rs new file mode 100644 index 0000000..1de8ab5 --- /dev/null +++ b/src/memory/conversations.rs @@ -0,0 +1,249 @@ +use anyhow::{Context, Result}; +use uuid::Uuid; + +use super::MemoryStore; +use crate::llm::ChatMessage; + +/// Cast a &[f32] to &[u8] for SQLite blob storage +pub(crate) fn f32_slice_to_bytes(floats: &[f32]) -> &[u8] { + unsafe { std::slice::from_raw_parts(floats.as_ptr() as *const u8, floats.len() * 4) } +} + +/// Cast Vec to Vec for SQLite blob storage +pub(crate) fn f32_vec_to_bytes(floats: &[f32]) -> Vec { + f32_slice_to_bytes(floats).to_vec() +} + +impl MemoryStore { + /// Get or create a conversation for a platform user + pub async fn get_or_create_conversation( + &self, + platform: &str, + user_id: &str, + ) -> Result { + let conn = self.conn.lock().await; + + // Try to find an existing active conversation + let existing: Option = conn + .query_row( + "SELECT id FROM conversations + WHERE platform = ?1 AND user_id = ?2 + ORDER BY updated_at DESC LIMIT 1", + rusqlite::params![platform, user_id], + |row| row.get(0), + ) + .ok(); + + if let Some(id) = existing { + return Ok(id); + } + + // Create a new conversation + let id = Uuid::new_v4().to_string(); + conn.execute( + "INSERT INTO conversations (id, platform, user_id) VALUES (?1, ?2, ?3)", + rusqlite::params![&id, platform, user_id], + ) + .context("Failed to create conversation")?; + + Ok(id) + } + + /// Save a message to a conversation, with optional vector embedding + pub async fn save_message( + &self, + conversation_id: &str, + message: &ChatMessage, + ) -> Result { + let id = Uuid::new_v4().to_string(); + let tool_calls_json = message + .tool_calls + .as_ref() + .map(|tc| serde_json::to_string(tc).unwrap_or_default()); + + // Generate embedding before acquiring the DB lock (async HTTP call) + let embedding = if let Some(content) = &message.content { + if !content.is_empty() && message.role != "tool" { + self.embeddings.try_embed_one(content).await + } else { + None + } + } else { + None + }; + + let conn = self.conn.lock().await; + + conn.execute( + "INSERT INTO messages (id, conversation_id, role, content, tool_calls, tool_call_id) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + rusqlite::params![ + &id, + conversation_id, + &message.role, + &message.content, + &tool_calls_json, + &message.tool_call_id, + ], + ) + .context("Failed to save message")?; + + let rowid = conn.last_insert_rowid(); + + // Update conversation timestamp + conn.execute( + "UPDATE conversations SET updated_at = datetime('now') WHERE id = ?1", + rusqlite::params![conversation_id], + )?; + + // Store vector embedding if available + if let Some(ref emb) = embedding { + let embedding_bytes = f32_slice_to_bytes(emb); + conn.execute( + "INSERT INTO message_embeddings (rowid, embedding) VALUES (?1, ?2)", + rusqlite::params![rowid, embedding_bytes], + )?; + } + + Ok(id) + } + + /// Load all messages for a conversation + pub async fn load_messages(&self, conversation_id: &str) -> Result> { + let conn = self.conn.lock().await; + let mut stmt = conn.prepare( + "SELECT role, content, tool_calls, tool_call_id + FROM messages + WHERE conversation_id = ?1 + ORDER BY created_at ASC", + )?; + + let messages = stmt + .query_map(rusqlite::params![conversation_id], |row| { + let tool_calls_json: Option = row.get(2)?; + let tool_calls = tool_calls_json.and_then(|json| serde_json::from_str(&json).ok()); + + Ok(ChatMessage { + role: row.get(0)?, + content: row.get(1)?, + tool_calls, + tool_call_id: row.get(3)?, + }) + })? + .collect::, _>>() + .context("Failed to load messages")?; + + Ok(messages) + } + + /// Clear a conversation (delete all its messages and embeddings) + pub async fn clear_conversation(&self, platform: &str, user_id: &str) -> Result<()> { + let conn = self.conn.lock().await; + + // Delete embeddings for messages in this conversation + conn.execute( + "DELETE FROM message_embeddings WHERE rowid IN ( + SELECT m.rowid FROM messages m + JOIN conversations c ON m.conversation_id = c.id + WHERE c.platform = ?1 AND c.user_id = ?2 + )", + rusqlite::params![platform, user_id], + )?; + + conn.execute( + "DELETE FROM messages WHERE conversation_id IN ( + SELECT id FROM conversations WHERE platform = ?1 AND user_id = ?2 + )", + rusqlite::params![platform, user_id], + )?; + + conn.execute( + "DELETE FROM conversations WHERE platform = ?1 AND user_id = ?2", + rusqlite::params![platform, user_id], + )?; + + Ok(()) + } + + /// Hybrid search across messages using Reciprocal Rank Fusion (vector + FTS5). + /// Falls back to FTS5-only if embeddings are not available. + pub async fn search_messages(&self, query: &str, limit: usize) -> Result> { + // Try to get query embedding for vector search + let query_embedding = self.embeddings.try_embed_one(query).await; + + let conn = self.conn.lock().await; + + if let Some(ref qe) = query_embedding { + // Hybrid search with Reciprocal Rank Fusion + let query_bytes = f32_vec_to_bytes(qe); + let sql = " + WITH vec_matches AS ( + SELECT rowid, distance, + row_number() OVER (ORDER BY distance) as rank_number + FROM message_embeddings + WHERE embedding MATCH ?1 + ORDER BY distance + LIMIT ?2 + ), + fts_matches AS ( + SELECT rowid, + row_number() OVER (ORDER BY rank) as rank_number + FROM messages_fts + WHERE messages_fts MATCH ?3 + LIMIT ?2 + ) + SELECT m.role, m.content, m.tool_calls, m.tool_call_id, + coalesce(1.0 / (60 + fts.rank_number), 0.0) * 0.5 + + coalesce(1.0 / (60 + vec.rank_number), 0.0) * 0.5 as combined_rank + FROM messages m + LEFT JOIN vec_matches vec ON m.rowid = vec.rowid + LEFT JOIN fts_matches fts ON m.rowid = fts.rowid + WHERE vec.rowid IS NOT NULL OR fts.rowid IS NOT NULL + ORDER BY combined_rank DESC + LIMIT ?2 + "; + + let search_limit = (limit * 3) as i64; + let mut stmt = conn.prepare(sql)?; + let messages = stmt + .query_map(rusqlite::params![query_bytes, search_limit, query], |row| { + parse_message_row(row) + })? + .collect::, _>>() + .context("Failed to hybrid-search messages")?; + + Ok(messages.into_iter().take(limit).collect()) + } else { + // FTS5-only fallback + let sql = " + SELECT m.role, m.content, m.tool_calls, m.tool_call_id + FROM messages m + JOIN messages_fts fts ON m.rowid = fts.rowid + WHERE messages_fts MATCH ?1 + ORDER BY fts.rank + LIMIT ?2 + "; + let mut stmt = conn.prepare(sql)?; + let messages = stmt + .query_map(rusqlite::params![query, limit as i64], |row| { + parse_message_row(row) + })? + .collect::, _>>() + .context("Failed to FTS-search messages")?; + + Ok(messages) + } + } +} + +fn parse_message_row(row: &rusqlite::Row) -> rusqlite::Result { + let tool_calls_json: Option = row.get(2)?; + let tool_calls = tool_calls_json.and_then(|json| serde_json::from_str(&json).ok()); + + Ok(ChatMessage { + role: row.get(0)?, + content: row.get(1)?, + tool_calls, + tool_call_id: row.get(3)?, + }) +} diff --git a/src/memory/embeddings.rs b/src/memory/embeddings.rs new file mode 100644 index 0000000..61a378b --- /dev/null +++ b/src/memory/embeddings.rs @@ -0,0 +1,120 @@ +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; +use tracing::{info, warn}; + +/// Embedding engine that calls an OpenAI-compatible /v1/embeddings API. +/// Works with OpenRouter, OpenAI, Ollama, or any compatible provider. +pub struct EmbeddingEngine { + client: reqwest::Client, + config: Option, +} + +/// Configuration for the embedding API +#[derive(Debug, Clone)] +pub struct EmbeddingConfig { + pub api_key: String, + pub base_url: String, + pub model: String, + pub dimensions: usize, +} + +#[derive(Serialize)] +struct EmbeddingRequest { + model: String, + input: Vec, +} + +#[derive(Deserialize)] +struct EmbeddingResponse { + data: Vec, +} + +#[derive(Deserialize)] +struct EmbeddingData { + embedding: Vec, +} + +impl EmbeddingEngine { + /// Create an embedding engine with API configuration. + /// If config is None, embedding features are disabled (FTS5-only fallback). + pub fn new(config: Option) -> Self { + if let Some(ref cfg) = config { + info!( + "Embedding engine configured: model={}, dims={}, url={}", + cfg.model, cfg.dimensions, cfg.base_url + ); + } else { + info!("Embedding engine disabled (no embedding config). Using FTS5-only search."); + } + Self { + client: reqwest::Client::new(), + config, + } + } + + /// Whether vector embeddings are available + pub fn is_available(&self) -> bool { + self.config.is_some() + } + + /// Embedding dimensions (default 384) + pub fn dimensions(&self) -> usize { + self.config.as_ref().map(|c| c.dimensions).unwrap_or(384) + } + + /// Generate a single embedding for one text via API + pub async fn embed_one(&self, text: &str) -> Result> { + let config = self + .config + .as_ref() + .context("Embedding engine not configured")?; + + let url = format!("{}/embeddings", config.base_url); + + let request = EmbeddingRequest { + model: config.model.clone(), + input: vec![text.to_string()], + }; + + let response = self + .client + .post(&url) + .header("Authorization", format!("Bearer {}", config.api_key)) + .header("Content-Type", "application/json") + .json(&request) + .send() + .await + .context("Failed to call embedding API")?; + + let status = response.status(); + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + anyhow::bail!("Embedding API error ({}): {}", status, body); + } + + let resp: EmbeddingResponse = response + .json() + .await + .context("Failed to parse embedding response")?; + + resp.data + .into_iter() + .next() + .map(|d| d.embedding) + .context("No embedding returned from API") + } + + /// Try to generate an embedding, returning None if not available or on error + pub async fn try_embed_one(&self, text: &str) -> Option> { + if !self.is_available() { + return None; + } + match self.embed_one(text).await { + Ok(embedding) => Some(embedding), + Err(e) => { + warn!("Embedding generation failed: {}", e); + None + } + } + } +} diff --git a/src/memory/knowledge.rs b/src/memory/knowledge.rs new file mode 100644 index 0000000..7e825cf --- /dev/null +++ b/src/memory/knowledge.rs @@ -0,0 +1,219 @@ +use anyhow::{Context, Result}; +use uuid::Uuid; + +use super::MemoryStore; +use crate::memory::conversations::{f32_slice_to_bytes, f32_vec_to_bytes}; + +/// A knowledge entry the agent has learned +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct KnowledgeEntry { + pub id: String, + pub category: String, + pub key: String, + pub value: String, + pub source: Option, +} + +impl MemoryStore { + /// Store or update a knowledge entry with vector embedding + pub async fn remember( + &self, + category: &str, + key: &str, + value: &str, + source: Option<&str>, + ) -> Result<()> { + let id = Uuid::new_v4().to_string(); + + // Generate embedding before DB lock (async HTTP call) + let embed_text = format!("{}: {}", key, value); + let embedding = self.embeddings.try_embed_one(&embed_text).await; + + let conn = self.conn.lock().await; + + // Check if entry exists (for update case — need to remove old embedding) + let old_rowid: Option = conn + .query_row( + "SELECT rowid FROM knowledge WHERE category = ?1 AND key = ?2", + rusqlite::params![category, key], + |row| row.get(0), + ) + .ok(); + + if let Some(old_rowid) = old_rowid { + conn.execute( + "DELETE FROM knowledge_embeddings WHERE rowid = ?1", + rusqlite::params![old_rowid], + )?; + } + + conn.execute( + "INSERT INTO knowledge (id, category, key, value, source) + VALUES (?1, ?2, ?3, ?4, ?5) + ON CONFLICT(category, key) DO UPDATE SET + value = excluded.value, + source = excluded.source, + updated_at = datetime('now')", + rusqlite::params![&id, category, key, value, source], + ) + .context("Failed to store knowledge")?; + + // Get the rowid for embedding + let rowid: i64 = conn.query_row( + "SELECT rowid FROM knowledge WHERE category = ?1 AND key = ?2", + rusqlite::params![category, key], + |row| row.get(0), + )?; + + // Store embedding if available + if let Some(ref emb) = embedding { + let embedding_bytes = f32_slice_to_bytes(emb); + conn.execute( + "INSERT INTO knowledge_embeddings (rowid, embedding) VALUES (?1, ?2)", + rusqlite::params![rowid, embedding_bytes], + )?; + } + + Ok(()) + } + + /// Recall a specific knowledge entry by exact key + pub async fn recall(&self, category: &str, key: &str) -> Result> { + let conn = self.conn.lock().await; + let result = conn + .query_row( + "SELECT value FROM knowledge WHERE category = ?1 AND key = ?2", + rusqlite::params![category, key], + |row| row.get(0), + ) + .ok(); + + Ok(result) + } + + /// Hybrid search across knowledge using Reciprocal Rank Fusion (vector + FTS5). + /// Falls back to FTS5-only if embeddings are not available. + pub async fn search_knowledge(&self, query: &str, limit: usize) -> Result> { + let query_embedding = self.embeddings.try_embed_one(query).await; + + let conn = self.conn.lock().await; + + if let Some(ref qe) = query_embedding { + // Hybrid search with Reciprocal Rank Fusion + let query_bytes = f32_vec_to_bytes(qe); + let sql = " + WITH vec_matches AS ( + SELECT rowid, distance, + row_number() OVER (ORDER BY distance) as rank_number + FROM knowledge_embeddings + WHERE embedding MATCH ?1 + ORDER BY distance + LIMIT ?2 + ), + fts_matches AS ( + SELECT rowid, + row_number() OVER (ORDER BY rank) as rank_number + FROM knowledge_fts + WHERE knowledge_fts MATCH ?3 + LIMIT ?2 + ) + SELECT k.id, k.category, k.key, k.value, k.source, + coalesce(1.0 / (60 + fts.rank_number), 0.0) * 0.5 + + coalesce(1.0 / (60 + vec.rank_number), 0.0) * 0.5 as combined_rank + FROM knowledge k + LEFT JOIN vec_matches vec ON k.rowid = vec.rowid + LEFT JOIN fts_matches fts ON k.rowid = fts.rowid + WHERE vec.rowid IS NOT NULL OR fts.rowid IS NOT NULL + ORDER BY combined_rank DESC + LIMIT ?2 + "; + + let search_limit = (limit * 3) as i64; + let mut stmt = conn.prepare(sql)?; + let entries = stmt + .query_map(rusqlite::params![query_bytes, search_limit, query], |row| { + parse_knowledge_row(row) + })? + .collect::, _>>() + .context("Failed to hybrid-search knowledge")?; + + Ok(entries.into_iter().take(limit).collect()) + } else { + // FTS5-only fallback + let sql = " + SELECT k.id, k.category, k.key, k.value, k.source + FROM knowledge k + JOIN knowledge_fts fts ON k.rowid = fts.rowid + WHERE knowledge_fts MATCH ?1 + ORDER BY fts.rank + LIMIT ?2 + "; + let mut stmt = conn.prepare(sql)?; + let entries = stmt + .query_map(rusqlite::params![query, limit as i64], |row| { + parse_knowledge_row(row) + })? + .collect::, _>>() + .context("Failed to FTS-search knowledge")?; + + Ok(entries) + } + } + + /// List all knowledge in a category + #[allow(dead_code)] + pub async fn list_knowledge(&self, category: &str) -> Result> { + let conn = self.conn.lock().await; + let mut stmt = conn.prepare( + "SELECT id, category, key, value, source + FROM knowledge + WHERE category = ?1 + ORDER BY key", + )?; + + let entries = stmt + .query_map(rusqlite::params![category], parse_knowledge_row)? + .collect::, _>>() + .context("Failed to list knowledge")?; + + Ok(entries) + } + + /// Forget a knowledge entry + #[allow(dead_code)] + pub async fn forget(&self, category: &str, key: &str) -> Result { + let conn = self.conn.lock().await; + + let rowid: Option = conn + .query_row( + "SELECT rowid FROM knowledge WHERE category = ?1 AND key = ?2", + rusqlite::params![category, key], + |row| row.get(0), + ) + .ok(); + + if let Some(rowid) = rowid { + conn.execute( + "DELETE FROM knowledge_embeddings WHERE rowid = ?1", + rusqlite::params![rowid], + )?; + } + + let rows = conn.execute( + "DELETE FROM knowledge WHERE category = ?1 AND key = ?2", + rusqlite::params![category, key], + )?; + Ok(rows > 0) + } +} + +fn parse_knowledge_row(row: &rusqlite::Row) -> rusqlite::Result { + Ok(KnowledgeEntry { + id: row.get(0)?, + category: row.get(1)?, + key: row.get(2)?, + value: row.get(3)?, + source: row.get(4)?, + }) +} diff --git a/src/memory/mod.rs b/src/memory/mod.rs new file mode 100644 index 0000000..b0014ab --- /dev/null +++ b/src/memory/mod.rs @@ -0,0 +1,267 @@ +pub mod conversations; +pub mod embeddings; +pub mod knowledge; + +use anyhow::{Context, Result}; +use rusqlite::{Connection, OptionalExtension}; +use std::path::Path; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::info; + +use crate::memory::embeddings::{EmbeddingConfig, EmbeddingEngine}; + +/// Thread-safe SQLite memory store with hybrid vector+FTS5 search +#[derive(Clone)] +pub struct MemoryStore { + conn: Arc>, + pub embeddings: Arc, +} + +impl MemoryStore { + /// Open or create the SQLite database at the given path. + /// If `embedding_config` is provided, vector search is enabled alongside FTS5. + /// If None, falls back to FTS5-only search. + pub fn open(path: &Path, embedding_config: Option) -> Result { + // Register sqlite-vec extension before opening any connection + unsafe { + type VecInitFn = unsafe extern "C" fn( + *mut rusqlite::ffi::sqlite3, + *mut *mut i8, + *const rusqlite::ffi::sqlite3_api_routines, + ) -> i32; + rusqlite::ffi::sqlite3_auto_extension(Some( + std::mem::transmute::<*const (), VecInitFn>( + sqlite_vec::sqlite3_vec_init as *const (), + ), + )); + } + + let conn = Connection::open(path) + .with_context(|| format!("Failed to open database: {}", path.display()))?; + + // Enable WAL mode for better concurrent read performance + // journal_mode PRAGMA always returns the resulting mode, so use query_row + let _: String = conn.query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))?; + conn.execute_batch("PRAGMA foreign_keys=ON;")?; + + let embeddings = EmbeddingEngine::new(embedding_config); + + // Run migrations on the raw connection before wrapping in Mutex. + // This avoids blocking_lock() panic when called from async context. + Self::run_migrations(&conn, embeddings.dimensions())?; + + let store = Self { + conn: Arc::new(Mutex::new(conn)), + embeddings: Arc::new(embeddings), + }; + + info!("Memory store initialized at: {}", path.display()); + Ok(store) + } + + /// Open an in-memory database (for testing) + #[allow(dead_code)] + pub fn open_in_memory() -> Result { + unsafe { + type VecInitFn = unsafe extern "C" fn( + *mut rusqlite::ffi::sqlite3, + *mut *mut i8, + *const rusqlite::ffi::sqlite3_api_routines, + ) -> i32; + rusqlite::ffi::sqlite3_auto_extension(Some( + std::mem::transmute::<*const (), VecInitFn>( + sqlite_vec::sqlite3_vec_init as *const (), + ), + )); + } + + let conn = Connection::open_in_memory()?; + conn.execute_batch("PRAGMA foreign_keys=ON;")?; + + let embeddings = EmbeddingEngine::new(None); + + Self::run_migrations(&conn, embeddings.dimensions())?; + + let store = Self { + conn: Arc::new(Mutex::new(conn)), + embeddings: Arc::new(embeddings), + }; + Ok(store) + } + + fn run_migrations(conn: &Connection, dims: usize) -> Result<()> { + conn.execute_batch( + " + -- Conversations table + CREATE TABLE IF NOT EXISTS conversations ( + id TEXT PRIMARY KEY, + platform TEXT NOT NULL, + user_id TEXT NOT NULL, + started_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + + -- Messages table + CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + conversation_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT, + tool_calls TEXT, + tool_call_id TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + FOREIGN KEY (conversation_id) REFERENCES conversations(id) + ); + + CREATE INDEX IF NOT EXISTS idx_messages_conversation + ON messages(conversation_id, created_at); + + CREATE INDEX IF NOT EXISTS idx_conversations_user + ON conversations(platform, user_id, updated_at); + + -- Knowledge table + CREATE TABLE IF NOT EXISTS knowledge ( + id TEXT PRIMARY KEY, + category TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + source TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + + CREATE UNIQUE INDEX IF NOT EXISTS idx_knowledge_key + ON knowledge(category, key); + + -- FTS5 virtual tables for full-text search + CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5( + content, + content=messages, + content_rowid=rowid + ); + + CREATE VIRTUAL TABLE IF NOT EXISTS knowledge_fts USING fts5( + key, + value, + content=knowledge, + content_rowid=rowid + ); + + -- Triggers to keep FTS in sync + CREATE TRIGGER IF NOT EXISTS messages_fts_insert AFTER INSERT ON messages + WHEN NEW.content IS NOT NULL BEGIN + INSERT INTO messages_fts(rowid, content) VALUES (NEW.rowid, NEW.content); + END; + + CREATE TRIGGER IF NOT EXISTS messages_fts_delete AFTER DELETE ON messages + WHEN OLD.content IS NOT NULL BEGIN + INSERT INTO messages_fts(messages_fts, rowid, content) + VALUES('delete', OLD.rowid, OLD.content); + END; + + CREATE TRIGGER IF NOT EXISTS knowledge_fts_insert AFTER INSERT ON knowledge BEGIN + INSERT INTO knowledge_fts(rowid, key, value) + VALUES (NEW.rowid, NEW.key, NEW.value); + END; + + CREATE TRIGGER IF NOT EXISTS knowledge_fts_delete AFTER DELETE ON knowledge BEGIN + INSERT INTO knowledge_fts(knowledge_fts, rowid, key, value) + VALUES('delete', OLD.rowid, OLD.key, OLD.value); + END; + + CREATE TRIGGER IF NOT EXISTS knowledge_fts_update AFTER UPDATE ON knowledge BEGIN + INSERT INTO knowledge_fts(knowledge_fts, rowid, key, value) + VALUES('delete', OLD.rowid, OLD.key, OLD.value); + INSERT INTO knowledge_fts(rowid, key, value) + VALUES (NEW.rowid, NEW.key, NEW.value); + END; + + -- Schema metadata (e.g. embedding dimension for vec tables) + CREATE TABLE IF NOT EXISTS schema_meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + ", + )?; + + // Stored embedding dimension (None if legacy DB without schema_meta row) + let raw: Option = conn + .query_row( + "SELECT value FROM schema_meta WHERE key = 'embedding_dims'", + [], + |row| row.get(0), + ) + .optional() + .context("schema_meta query")?; + let stored_dims: Option = raw.and_then(|s| s.parse().ok()); + + let need_migrate = !matches!(stored_dims, Some(s) if s == dims); + + let table_exists = |conn: &Connection, name: &str| -> bool { + conn.query_row( + &format!( + "SELECT count(*) > 0 FROM sqlite_master WHERE type='table' AND name='{}'", + name + ), + [], + |row| row.get(0), + ) + .unwrap_or(false) + }; + + if need_migrate { + // Drop vec tables so we can recreate with new dimension + if table_exists(conn, "message_embeddings") { + conn.execute_batch("DROP TABLE message_embeddings;")?; + } + if table_exists(conn, "knowledge_embeddings") { + conn.execute_batch("DROP TABLE knowledge_embeddings;")?; + } + conn.execute_batch(&format!( + "CREATE VIRTUAL TABLE message_embeddings USING vec0(embedding float[{}]);", + dims + ))?; + conn.execute_batch(&format!( + "CREATE VIRTUAL TABLE knowledge_embeddings USING vec0(embedding float[{}]);", + dims + ))?; + conn.execute( + "INSERT OR REPLACE INTO schema_meta (key, value) VALUES ('embedding_dims', ?1)", + [dims.to_string()], + )?; + if let Some(prev_dims) = stored_dims { + info!( + "Embedding dimension changed from {} to {}; vector tables recreated.", + prev_dims, dims + ); + } + } else { + // Create vec tables only if they don't exist (same dimension) + if !table_exists(conn, "message_embeddings") { + conn.execute_batch(&format!( + "CREATE VIRTUAL TABLE message_embeddings USING vec0(embedding float[{}]);", + dims + ))?; + conn.execute( + "INSERT OR REPLACE INTO schema_meta (key, value) VALUES ('embedding_dims', ?1)", + [dims.to_string()], + )?; + } + if !table_exists(conn, "knowledge_embeddings") { + conn.execute_batch(&format!( + "CREATE VIRTUAL TABLE knowledge_embeddings USING vec0(embedding float[{}]);", + dims + ))?; + if stored_dims.is_none() { + conn.execute( + "INSERT OR REPLACE INTO schema_meta (key, value) VALUES ('embedding_dims', ?1)", + [dims.to_string()], + )?; + } + } + } + + Ok(()) + } +} diff --git a/src/platform/mod.rs b/src/platform/mod.rs new file mode 100644 index 0000000..0eccdac --- /dev/null +++ b/src/platform/mod.rs @@ -0,0 +1,17 @@ +pub mod telegram; + +/// A message received from any platform +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct IncomingMessage { + /// Platform identifier (e.g., "telegram", "discord") + pub platform: String, + /// Platform-specific user ID as string + pub user_id: String, + /// Platform-specific chat/channel ID as string + pub chat_id: String, + /// Display name of the user + pub user_name: String, + /// The message text + pub text: String, +} diff --git a/src/platform/telegram.rs b/src/platform/telegram.rs new file mode 100644 index 0000000..4f48803 --- /dev/null +++ b/src/platform/telegram.rs @@ -0,0 +1,169 @@ +use std::sync::Arc; + +use anyhow::Result; +use teloxide::prelude::*; +use tracing::{error, info, warn}; + +use crate::agent::Agent; +use crate::platform::IncomingMessage; + +/// Split long messages for Telegram's 4096 char limit +fn split_message(text: &str, max_len: usize) -> Vec { + if text.len() <= max_len { + return vec![text.to_string()]; + } + + let mut chunks = Vec::new(); + let mut start = 0; + + while start < text.len() { + let end = (start + max_len).min(text.len()); + let actual_end = if end < text.len() { + text[start..end] + .rfind('\n') + .or_else(|| text[start..end].rfind(' ')) + .map(|pos| start + pos + 1) + .unwrap_or(end) + } else { + end + }; + + chunks.push(text[start..actual_end].to_string()); + start = actual_end; + } + + chunks +} + +/// Run the Telegram bot platform +pub async fn run(agent: Arc, allowed_user_ids: Vec, bot_token: &str) -> Result<()> { + let bot = Bot::new(bot_token); + + info!("Starting Telegram platform..."); + + let handler = Update::filter_message() + .filter_map(move |msg: Message| { + let user = msg.from.as_ref()?; + if allowed_user_ids.contains(&user.id.0) { + Some(msg) + } else { + None + } + }) + .endpoint(handle_message); + + Dispatcher::builder(bot, handler) + .dependencies(dptree::deps![agent]) + .default_handler(|upd| async move { + warn!("Unhandled update: {:?}", upd.id); + }) + .error_handler(LoggingErrorHandler::with_custom_text("telegram")) + .build() + .dispatch() + .await; + + Ok(()) +} + +async fn handle_message(bot: Bot, msg: Message, agent: Arc) -> ResponseResult<()> { + let user = match msg.from.as_ref() { + Some(user) => user, + None => return Ok(()), + }; + + let user_id = user.id.0; + let text = match msg.text() { + Some(t) => t.to_string(), + None => return Ok(()), + }; + + let user_name = user.first_name.clone(); + + info!( + "Telegram message from {} ({}): {}", + user_name, user_id, text + ); + + // Handle commands + if text == "/clear" { + if let Err(e) = agent + .clear_conversation("telegram", &user_id.to_string()) + .await + { + error!("Failed to clear conversation: {}", e); + } + bot.send_message(msg.chat.id, "Conversation cleared.") + .await?; + return Ok(()); + } + + if text == "/start" { + bot.send_message( + msg.chat.id, + "Hello! I'm your AI assistant. Send me a message and I'll help you.\n\n\ + Commands:\n\ + /clear - Clear conversation history\n\ + /tools - List available tools\n\ + /skills - List loaded skills", + ) + .await?; + return Ok(()); + } + + if text == "/tools" { + let all_tools = agent.all_tool_definitions(); + let mut tool_list = String::from("Available tools:\n\n"); + for tool in &all_tools { + tool_list.push_str(&format!( + " - {}: {}\n", + tool.function.name, tool.function.description + )); + } + bot.send_message(msg.chat.id, tool_list).await?; + return Ok(()); + } + + if text == "/skills" { + let skills = agent.skills.list(); + if skills.is_empty() { + bot.send_message(msg.chat.id, "No skills loaded.").await?; + } else { + let mut skill_list = String::from("Loaded skills:\n\n"); + for skill in &skills { + skill_list.push_str(&format!(" - {}: {}\n", skill.name, skill.description)); + } + bot.send_message(msg.chat.id, skill_list).await?; + } + return Ok(()); + } + + // Send "typing" indicator + bot.send_chat_action(msg.chat.id, teloxide::types::ChatAction::Typing) + .await + .ok(); + + // Build platform-agnostic message + let incoming = IncomingMessage { + platform: "telegram".to_string(), + user_id: user_id.to_string(), + chat_id: msg.chat.id.0.to_string(), + user_name, + text, + }; + + // Process through agent + match agent.process_message(&incoming).await { + Ok(response) => { + for chunk in split_message(&response, 4000) { + bot.send_message(msg.chat.id, chunk).await.ok(); + } + } + Err(e) => { + error!("Error processing message: {:#}", e); + bot.send_message(msg.chat.id, format!("Error: {}", e)) + .await?; + } + } + + Ok(()) +} diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs new file mode 100644 index 0000000..11aa0b8 --- /dev/null +++ b/src/scheduler/mod.rs @@ -0,0 +1,69 @@ +pub mod tasks; + +use anyhow::{Context, Result}; +use tokio_cron_scheduler::{Job, JobScheduler}; +use tracing::info; + +/// Wrapper around tokio-cron-scheduler for background tasks +pub struct Scheduler { + inner: JobScheduler, +} + +impl Scheduler { + /// Create a new scheduler + pub async fn new() -> Result { + let inner = JobScheduler::new() + .await + .context("Failed to create job scheduler")?; + Ok(Self { inner }) + } + + /// Add a recurring cron job + pub async fn add_cron_job(&self, cron_expr: &str, name: &str, task: F) -> Result<()> + where + F: Fn() -> std::pin::Pin + Send>> + + Send + + Sync + + 'static, + { + let job_name = name.to_string(); + let job = Job::new_async(cron_expr, move |_uuid, _lock| { + let name = job_name.clone(); + let fut = task(); + Box::pin(async move { + info!("Running scheduled task: {}", name); + fut.await; + }) + }) + .with_context(|| format!("Failed to create cron job: {}", name))?; + + self.inner + .add(job) + .await + .with_context(|| format!("Failed to add job: {}", name))?; + + info!("Scheduled task '{}' with cron: {}", name, cron_expr); + Ok(()) + } + + /// Start the scheduler + pub async fn start(&self) -> Result<()> { + self.inner + .start() + .await + .context("Failed to start scheduler")?; + info!("Scheduler started"); + Ok(()) + } + + /// Shutdown the scheduler + #[allow(dead_code)] + pub async fn shutdown(&mut self) -> Result<()> { + self.inner + .shutdown() + .await + .context("Failed to shutdown scheduler")?; + info!("Scheduler stopped"); + Ok(()) + } +} diff --git a/src/scheduler/tasks.rs b/src/scheduler/tasks.rs new file mode 100644 index 0000000..faae723 --- /dev/null +++ b/src/scheduler/tasks.rs @@ -0,0 +1,21 @@ +use tracing::info; + +use crate::memory::MemoryStore; +use crate::scheduler::Scheduler; + +/// Register built-in background tasks +pub async fn register_builtin_tasks( + scheduler: &Scheduler, + _memory: MemoryStore, +) -> anyhow::Result<()> { + // Heartbeat — log that the bot is alive every hour + scheduler + .add_cron_job("0 0 * * * *", "heartbeat", || { + Box::pin(async { + info!("Heartbeat: bot is alive"); + }) + }) + .await?; + + Ok(()) +} diff --git a/src/skills/loader.rs b/src/skills/loader.rs new file mode 100644 index 0000000..b8d3a05 --- /dev/null +++ b/src/skills/loader.rs @@ -0,0 +1,163 @@ +use anyhow::{Context, Result}; +use std::path::Path; +use tracing::{info, warn}; + +use super::{Skill, SkillRegistry}; + +/// Load all markdown skill files from a directory. +/// +/// Supports two formats: +/// 1. `skills/my-skill.md` — standalone markdown file +/// 2. `skills/my-skill/SKILL.md` — directory with a SKILL.md file +/// +/// Skill files can have optional YAML frontmatter: +/// ```markdown +/// --- +/// name: my-skill +/// description: What this skill does +/// tags: [coding, review] +/// --- +/// # Instructions here... +/// ``` +pub async fn load_skills_from_dir(dir: &Path) -> Result { + let mut registry = SkillRegistry::new(); + + if !dir.exists() { + info!("Skills directory not found: {}, skipping", dir.display()); + return Ok(registry); + } + + let mut entries = tokio::fs::read_dir(dir) + .await + .with_context(|| format!("Failed to read skills directory: {}", dir.display()))?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + + // Support .md files and directories containing SKILL.md + let skill_path = if path.is_dir() { + let skill_file = path.join("SKILL.md"); + if skill_file.exists() { + skill_file + } else { + continue; + } + } else if path.extension().and_then(|e| e.to_str()) == Some("md") { + path.clone() + } else { + continue; + }; + + match load_skill_file(&skill_path).await { + Ok(skill) => registry.register(skill), + Err(e) => warn!("Failed to load skill from {}: {}", skill_path.display(), e), + } + } + + info!("Loaded {} skills", registry.len()); + Ok(registry) +} + +async fn load_skill_file(path: &Path) -> Result { + let content = tokio::fs::read_to_string(path) + .await + .with_context(|| format!("Failed to read skill file: {}", path.display()))?; + + // Try to parse YAML frontmatter + if let Some(stripped) = content.strip_prefix("---") { + if let Some(end) = stripped.find("---") { + let frontmatter = stripped[..end].trim(); + let body = stripped[end + 3..].trim().to_string(); + + let name = extract_field(frontmatter, "name"); + let description = extract_field(frontmatter, "description"); + let tags = extract_list_field(frontmatter, "tags"); + + let skill_name = name.unwrap_or_else(|| name_from_path(path)); + + return Ok(Skill { + name: skill_name, + description: description.unwrap_or_else(|| first_line_or_heading(&body)), + content: body, + tags, + }); + } + } + + // No frontmatter — derive metadata from content + let name = name_from_path(path); + let description = first_line_or_heading(&content); + + Ok(Skill { + name, + description, + content: content.to_string(), + tags: Vec::new(), + }) +} + +/// Extract a simple `key: value` from YAML-like frontmatter +fn extract_field(frontmatter: &str, key: &str) -> Option { + let prefix = format!("{}:", key); + for line in frontmatter.lines() { + let line = line.trim(); + if let Some(rest) = line.strip_prefix(&prefix) { + let value = rest.trim().trim_matches('"').trim_matches('\''); + if !value.is_empty() { + return Some(value.to_string()); + } + } + } + None +} + +/// Extract a simple `key: [a, b, c]` list from frontmatter +fn extract_list_field(frontmatter: &str, key: &str) -> Vec { + let prefix = format!("{}:", key); + for line in frontmatter.lines() { + let line = line.trim(); + if let Some(rest) = line.strip_prefix(&prefix) { + let rest = rest.trim(); + if rest.starts_with('[') && rest.ends_with(']') { + return rest[1..rest.len() - 1] + .split(',') + .map(|s| s.trim().trim_matches('"').trim_matches('\'').to_string()) + .filter(|s| !s.is_empty()) + .collect(); + } + } + } + Vec::new() +} + +/// Derive skill name from file path +fn name_from_path(path: &Path) -> String { + // If it's SKILL.md inside a directory, use the directory name + if path.file_name().and_then(|f| f.to_str()) == Some("SKILL.md") { + if let Some(parent) = path.parent() { + if let Some(dir_name) = parent.file_name().and_then(|f| f.to_str()) { + return dir_name.to_string(); + } + } + } + // Otherwise use the file stem + path.file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("unnamed") + .to_string() +} + +/// Get the first heading or first line as a description +fn first_line_or_heading(content: &str) -> String { + for line in content.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + if let Some(heading) = line.strip_prefix('#') { + return heading.trim().trim_start_matches('#').trim().to_string(); + } + return line.to_string(); + } + "No description".to_string() +} diff --git a/src/skills/mod.rs b/src/skills/mod.rs new file mode 100644 index 0000000..51d7756 --- /dev/null +++ b/src/skills/mod.rs @@ -0,0 +1,75 @@ +pub mod loader; + +use std::collections::HashMap; +use tracing::info; + +/// A loaded skill from a markdown file +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct Skill { + /// Skill name (derived from filename or frontmatter) + pub name: String, + /// Short description + pub description: String, + /// Full markdown content (the instructions) + pub content: String, + /// Category/tags for organization + pub tags: Vec, +} + +/// Registry of all loaded skills +#[derive(Debug, Clone)] +pub struct SkillRegistry { + skills: HashMap, +} + +impl SkillRegistry { + pub fn new() -> Self { + Self { + skills: HashMap::new(), + } + } + + /// Register a skill + pub fn register(&mut self, skill: Skill) { + info!("Registered skill: {} — {}", skill.name, skill.description); + self.skills.insert(skill.name.clone(), skill); + } + + /// Get a skill by name + #[allow(dead_code)] + pub fn get(&self, name: &str) -> Option<&Skill> { + self.skills.get(name) + } + + /// List all registered skills + pub fn list(&self) -> Vec<&Skill> { + self.skills.values().collect() + } + + /// Build context string for the system prompt. + /// Gives the LLM awareness of all available skills. + pub fn build_context(&self) -> String { + if self.skills.is_empty() { + return String::new(); + } + + let mut context = String::from( + "You have the following skills available. When relevant, follow these instructions:\n\n", + ); + for skill in self.skills.values() { + context.push_str(&format!("## Skill: {}\n", skill.name)); + context.push_str(&format!("{}\n\n", skill.content)); + } + context + } + + pub fn len(&self) -> usize { + self.skills.len() + } + + #[allow(dead_code)] + pub fn is_empty(&self) -> bool { + self.skills.is_empty() + } +} diff --git a/src/tools.rs b/src/tools.rs index 0844a51..e283011 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -30,11 +30,7 @@ fn validate_sandbox_path(sandbox_dir: &Path, requested: &str) -> Result let parent_canonical = parent .canonicalize() .with_context(|| format!("Parent directory not found: {}", parent.display()))?; - parent_canonical.join( - requested_path - .file_name() - .context("Path has no filename")?, - ) + parent_canonical.join(requested_path.file_name().context("Path has no filename")?) }; if !check_path.starts_with(&sandbox_canonical) { @@ -157,16 +153,19 @@ pub async fn execute_builtin_tool( // Create parent directories if they don't exist if let Some(parent) = full_path.parent() { - tokio::fs::create_dir_all(parent) - .await - .with_context(|| format!("Failed to create directories: {}", parent.display()))?; + tokio::fs::create_dir_all(parent).await.with_context(|| { + format!("Failed to create directories: {}", parent.display()) + })?; } info!("Writing file: {}", full_path.display()); tokio::fs::write(&full_path, content) .await .with_context(|| format!("Failed to write file: {}", full_path.display()))?; - Ok(format!("File written successfully: {}", full_path.display())) + Ok(format!( + "File written successfully: {}", + full_path.display() + )) } "list_files" => { let path = arguments @@ -183,7 +182,11 @@ pub async fn execute_builtin_tool( while let Some(entry) = read_dir.next_entry().await? { let file_type = entry.file_type().await?; - let prefix = if file_type.is_dir() { "[DIR]" } else { "[FILE]" }; + let prefix = if file_type.is_dir() { + "[DIR]" + } else { + "[FILE]" + }; entries.push(format!( "{} {}", prefix,