From 9cfde0510ece951e84161efcfada44ac16de4c0a Mon Sep 17 00:00:00 2001 From: wsp Date: Mon, 16 Mar 2026 20:00:00 +0800 Subject: [PATCH 1/3] feat: add SessionHistory tool --- src/apps/cli/src/agent/core_adapter.rs | 1 + src/apps/desktop/src/api/mod.rs | 2 +- src/apps/desktop/src/api/session_api.rs | 46 +- src/apps/desktop/src/lib.rs | 1 + .../core/src/agentic/agents/claw_mode.rs | 1 + src/crates/core/src/agentic/agents/mod.rs | 5 +- .../agents/prompt_builder/prompt_builder.rs | 7 +- .../src/agentic/execution/execution_engine.rs | 15 - .../core/src/agentic/persistence/manager.rs | 962 +++++++++++++++++- .../tools/implementations/bash_tool.rs | 1 + .../src/agentic/tools/implementations/mod.rs | 2 + .../implementations/session_control_tool.rs | 52 +- .../implementations/session_history_tool.rs | 318 ++++++ .../agentic/tools/pipeline/state_manager.rs | 7 + src/crates/core/src/agentic/tools/registry.rs | 1 + .../src/service/agent_memory/agent_memory.rs | 168 +-- src/crates/core/src/service/session/types.rs | 85 +- src/crates/events/src/agentic.rs | 2 + .../flow-chat-manager/ToolEventModule.ts | 1 + src/web-ui/src/flow_chat/types/flow-chat.ts | 1 + .../src/shared/types/session-history.ts | 1 + src/web-ui/src/shared/types/tool-events.ts | 1 + 22 files changed, 1489 insertions(+), 191 deletions(-) create mode 100644 src/crates/core/src/agentic/tools/implementations/session_history_tool.rs diff --git a/src/apps/cli/src/agent/core_adapter.rs b/src/apps/cli/src/agent/core_adapter.rs index 095d5e3a..1c491c7a 100644 --- a/src/apps/cli/src/agent/core_adapter.rs +++ b/src/apps/cli/src/agent/core_adapter.rs @@ -279,6 +279,7 @@ impl Agent for CoreAgentAdapter { tool_id, tool_name, result, + result_for_assistant: _, duration_ms, } => { let result_str = serde_json::to_string(&result) diff --git a/src/apps/desktop/src/api/mod.rs b/src/apps/desktop/src/api/mod.rs index bdc521a5..84efbc1c 100644 --- a/src/apps/desktop/src/api/mod.rs +++ b/src/apps/desktop/src/api/mod.rs @@ -2,9 +2,9 @@ pub mod agentic_api; pub mod ai_memory_api; -pub mod browser_api; pub mod ai_rules_api; pub mod app_state; +pub mod browser_api; pub mod btw_api; pub mod clipboard_file_api; pub mod commands; diff --git a/src/apps/desktop/src/api/session_api.rs b/src/apps/desktop/src/api/session_api.rs index 8f00d297..c7c5d3ed 100644 --- a/src/apps/desktop/src/api/session_api.rs +++ b/src/apps/desktop/src/api/session_api.rs @@ -2,7 +2,9 @@ use bitfun_core::agentic::persistence::PersistenceManager; use bitfun_core::infrastructure::PathManager; -use bitfun_core::service::session::{DialogTurnData, SessionMetadata}; +use bitfun_core::service::session::{ + DialogTurnData, SessionMetadata, SessionTranscriptExport, SessionTranscriptExportOptions, +}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::sync::Arc; @@ -33,6 +35,24 @@ pub struct SaveSessionMetadataRequest { pub workspace_path: String, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExportSessionTranscriptRequest { + pub session_id: String, + pub workspace_path: String, + #[serde(default = "default_tools")] + pub tools: bool, + #[serde(default)] + pub tool_inputs: bool, + #[serde(default)] + pub thinking: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub turns: Option>, +} + +fn default_tools() -> bool { + false +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DeletePersistedSessionRequest { pub session_id: String, @@ -118,6 +138,30 @@ pub async fn save_session_metadata( .map_err(|e| format!("Failed to save session metadata: {}", e)) } +#[tauri::command] +pub async fn export_session_transcript( + request: ExportSessionTranscriptRequest, + path_manager: State<'_, Arc>, +) -> Result { + let workspace_path = PathBuf::from(&request.workspace_path); + let manager = PersistenceManager::new(path_manager.inner().clone()) + .map_err(|e| format!("Failed to create persistence manager: {}", e))?; + + manager + .export_session_transcript( + &workspace_path, + &request.session_id, + &SessionTranscriptExportOptions { + tools: request.tools, + tool_inputs: request.tool_inputs, + thinking: request.thinking, + turns: request.turns, + }, + ) + .await + .map_err(|e| format!("Failed to export session transcript: {}", e)) +} + #[tauri::command] pub async fn delete_persisted_session( request: DeletePersistedSessionRequest, diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index 0ad2f237..0a85fbbc 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -468,6 +468,7 @@ pub async fn run() { load_session_turns, save_session_turn, save_session_metadata, + export_session_transcript, delete_persisted_session, touch_session_activity, load_persisted_session_metadata, diff --git a/src/crates/core/src/agentic/agents/claw_mode.rs b/src/crates/core/src/agentic/agents/claw_mode.rs index 398779b5..8e39cc25 100644 --- a/src/crates/core/src/agentic/agents/claw_mode.rs +++ b/src/crates/core/src/agentic/agents/claw_mode.rs @@ -27,6 +27,7 @@ impl ClawMode { "TerminalControl".to_string(), "SessionControl".to_string(), "SessionMessage".to_string(), + "SessionHistory".to_string(), ], } } diff --git a/src/crates/core/src/agentic/agents/mod.rs b/src/crates/core/src/agentic/agents/mod.rs index fe60b769..f007a6bc 100644 --- a/src/crates/core/src/agentic/agents/mod.rs +++ b/src/crates/core/src/agentic/agents/mod.rs @@ -66,8 +66,9 @@ pub trait Agent: Send + Sync + 'static { async fn build_prompt(&self, context: &PromptBuilderContext) -> BitFunResult { let prompt_components = PromptBuilder::new(context.clone()); let template_name = self.prompt_template_name(context.model_name.as_deref()); - let system_prompt_template = get_embedded_prompt(template_name) - .ok_or_else(|| BitFunError::Agent(format!("{} not found in embedded files", template_name)))?; + let system_prompt_template = get_embedded_prompt(template_name).ok_or_else(|| { + BitFunError::Agent(format!("{} not found in embedded files", template_name)) + })?; let prompt = prompt_components .build_prompt_from_template(system_prompt_template) diff --git a/src/crates/core/src/agentic/agents/prompt_builder/prompt_builder.rs b/src/crates/core/src/agentic/agents/prompt_builder/prompt_builder.rs index 05f2ee5d..c51a1145 100644 --- a/src/crates/core/src/agentic/agents/prompt_builder/prompt_builder.rs +++ b/src/crates/core/src/agentic/agents/prompt_builder/prompt_builder.rs @@ -349,12 +349,7 @@ Do not read from, modify, create, move, or delete files outside this workspace u // Replace {AGENT_MEMORY} if result.contains(PLACEHOLDER_AGENT_MEMORY) { let workspace = Path::new(&self.context.workspace_path); - let agent_memory = match build_workspace_agent_memory_prompt( - workspace, - self.context.session_id.as_deref(), - ) - .await - { + let agent_memory = match build_workspace_agent_memory_prompt(workspace).await { Ok(prompt) => prompt, Err(e) => { warn!( diff --git a/src/crates/core/src/agentic/execution/execution_engine.rs b/src/crates/core/src/agentic/execution/execution_engine.rs index 0327a751..a36f72e1 100644 --- a/src/crates/core/src/agentic/execution/execution_engine.rs +++ b/src/crates/core/src/agentic/execution/execution_engine.rs @@ -1137,19 +1137,4 @@ mod tests { "model-primary" ); } - - #[test] - fn invalid_locked_auto_model_is_ignored() { - let mut ai_config = AIConfig::default(); - ai_config.models = vec![build_model("model-primary", "Primary", "claude-sonnet-4.5")]; - ai_config.default_models.primary = Some("model-primary".to_string()); - - assert_eq!( - ExecutionEngine::resolve_locked_auto_model_id( - &ai_config, - Some(&"deleted-fast-model".to_string()) - ), - None - ); - } } diff --git a/src/crates/core/src/agentic/persistence/manager.rs b/src/crates/core/src/agentic/persistence/manager.rs index 372199e1..32edcdb7 100644 --- a/src/crates/core/src/agentic/persistence/manager.rs +++ b/src/crates/core/src/agentic/persistence/manager.rs @@ -4,13 +4,18 @@ //! message/compression persistence used by in-memory managers. use crate::agentic::core::{ - CompressionState, Message, MessageContent, Session, SessionConfig, SessionState, SessionSummary, + strip_prompt_markup, CompressionState, Message, MessageContent, Session, SessionConfig, + SessionState, SessionSummary, }; use crate::infrastructure::PathManager; -use crate::service::session::{DialogTurnData, SessionMetadata, SessionStatus}; +use crate::service::session::{ + DialogTurnData, SessionMetadata, SessionStatus, SessionTranscriptExport, + SessionTranscriptExportOptions, SessionTranscriptIndexEntry, ToolItemData, TranscriptLineRange, +}; use crate::util::errors::{BitFunError, BitFunResult}; use log::{debug, info, warn}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::io::ErrorKind; use std::path::{Path, PathBuf}; @@ -21,8 +26,10 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::Mutex; const SESSION_SCHEMA_VERSION: u32 = 2; +const TRANSCRIPT_SCHEMA_VERSION: u32 = 1; const JSON_WRITE_MAX_RETRIES: usize = 5; const JSON_WRITE_RETRY_BASE_DELAY_MS: u64 = 30; +const SESSION_TRANSCRIPT_PREVIEW_CHAR_LIMIT: usize = 120; static JSON_FILE_WRITE_LOCKS: OnceLock>>>> = OnceLock::new(); static SESSION_INDEX_LOCKS: OnceLock>>>> = OnceLock::new(); @@ -65,6 +72,97 @@ struct StoredSessionIndex { sessions: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize)] +struct StoredSessionTranscriptFile { + schema_version: u32, + #[serde(flatten)] + transcript: SessionTranscriptExport, +} + +#[derive(Debug, Clone, Serialize)] +struct TranscriptFingerprintPayload { + session_id: String, + tools: bool, + tool_inputs: bool, + thinking: bool, + turn_selectors: Option>, + turns: Vec, +} + +#[derive(Debug, Clone, Serialize)] +struct TranscriptFingerprintTurn { + turn_id: String, + turn_index: usize, + status: String, + user: String, + assistant: Vec, + tools: Vec, + thinking: Vec, +} + +#[derive(Debug, Clone, Serialize)] +struct TranscriptFingerprintTextBlock { + round_index: usize, + content: String, +} + +#[derive(Debug, Clone, Serialize)] +struct TranscriptFingerprintTool { + tool_name: String, + tool_input: Option, + result: Option, +} + +#[derive(Debug, Clone)] +struct TranscriptTextBlock { + round_index: usize, + content: String, +} + +#[derive(Debug, Clone)] +struct TranscriptToolBlock { + tool_name: String, + tool_input: Option, + result: Option, +} + +#[derive(Debug, Clone)] +enum TranscriptRoundBlock { + Thinking(String), + Assistant(String), + Tool(TranscriptToolBlock), +} + +#[derive(Debug, Clone)] +struct TranscriptRoundData { + round_index: usize, + blocks: Vec, +} + +#[derive(Debug, Clone)] +struct TranscriptSectionData { + turn_index: usize, + preview: String, + lines: Vec, + turn_range: TranscriptLineRange, + user_range: TranscriptLineRange, +} + +#[derive(Debug, Clone, Copy)] +enum TranscriptTurnSelector { + Index(isize), + Slice { + start: Option, + end: Option, + }, +} + +#[derive(Debug, Clone)] +struct ParsedTranscriptTurnSelector { + normalized: String, + selector: TranscriptTurnSelector, +} + pub struct PersistenceManager { path_manager: Arc, } @@ -106,6 +204,11 @@ impl PersistenceManager { .join("snapshots") } + fn artifacts_dir(&self, workspace_path: &Path, session_id: &str) -> PathBuf { + self.session_dir(workspace_path, session_id) + .join("artifacts") + } + fn turn_path(&self, workspace_path: &Path, session_id: &str, turn_index: usize) -> PathBuf { self.turns_dir(workspace_path, session_id) .join(format!("turn-{:04}.json", turn_index)) @@ -121,6 +224,16 @@ impl PersistenceManager { .join(format!("context-{:04}.json", turn_index)) } + fn transcript_path(&self, workspace_path: &Path, session_id: &str) -> PathBuf { + self.artifacts_dir(workspace_path, session_id) + .join("transcript.txt") + } + + fn transcript_meta_path(&self, workspace_path: &Path, session_id: &str) -> PathBuf { + self.artifacts_dir(workspace_path, session_id) + .join("transcript.meta.json") + } + fn index_path(&self, workspace_path: &Path) -> PathBuf { self.project_sessions_dir(workspace_path).join("index.json") } @@ -172,6 +285,18 @@ impl PersistenceManager { Ok(dir) } + async fn ensure_artifacts_dir( + &self, + workspace_path: &Path, + session_id: &str, + ) -> BitFunResult { + let dir = self.artifacts_dir(workspace_path, session_id); + fs::create_dir_all(&dir) + .await + .map_err(|e| BitFunError::io(format!("Failed to create artifacts directory: {}", e)))?; + Ok(dir) + } + async fn read_json_optional( &self, path: &Path, @@ -479,6 +604,523 @@ impl PersistenceManager { } } + fn turn_status_label(status: &crate::service::session::TurnStatus) -> &'static str { + match status { + crate::service::session::TurnStatus::InProgress => "inprogress", + crate::service::session::TurnStatus::Completed => "completed", + crate::service::session::TurnStatus::Error => "error", + crate::service::session::TurnStatus::Cancelled => "cancelled", + } + } + + fn transcript_preview(content: &str) -> String { + let normalized = content.split_whitespace().collect::>().join(" "); + if normalized.is_empty() { + return "(empty user message)".to_string(); + } + + let mut preview: String = normalized + .chars() + .take(SESSION_TRANSCRIPT_PREVIEW_CHAR_LIMIT) + .collect(); + if normalized.chars().count() > SESSION_TRANSCRIPT_PREVIEW_CHAR_LIMIT { + preview.push_str("..."); + } + preview + } + + fn transcript_text_lines(content: &str) -> Vec { + if content.is_empty() { + return vec!["(empty)".to_string()]; + } + + let lines = content + .lines() + .map(|line| line.to_string()) + .collect::>(); + if lines.is_empty() { + vec!["(empty)".to_string()] + } else { + lines + } + } + + fn transcript_value_string(value: &serde_json::Value) -> String { + match value { + serde_json::Value::String(text) => text.clone(), + _ => serde_json::to_string_pretty(value).unwrap_or_else(|_| value.to_string()), + } + } + + fn transcript_tool_input(item: &ToolItemData, tool_inputs: bool) -> Option { + if !tool_inputs || item.tool_call.input.is_null() { + return None; + } + + Some(Self::transcript_value_string(&item.tool_call.input)) + } + + fn transcript_tool_result(item: &ToolItemData) -> Option { + item.tool_result.as_ref().and_then(|result| { + result + .result_for_assistant + .as_ref() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + .or_else(|| { + if result.result.is_null() { + None + } else { + Some(Self::transcript_value_string(&result.result)) + } + }) + }) + } + + fn transcript_display_user_content(turn: &DialogTurnData) -> String { + turn.user_message + .metadata + .as_ref() + .and_then(|metadata| metadata.get("original_text")) + .and_then(|value| value.as_str()) + .map(str::to_string) + .unwrap_or_else(|| strip_prompt_markup(&turn.user_message.content)) + } + + fn transcript_assistant_blocks(turn: &DialogTurnData) -> Vec { + turn.model_rounds + .iter() + .filter_map(|round| { + let content = round + .text_items + .iter() + .filter(|item| !item.is_subagent_item.unwrap_or(false)) + .map(|item| item.content.trim()) + .filter(|value| !value.is_empty()) + .collect::>() + .join("\n\n"); + if content.is_empty() { + None + } else { + Some(TranscriptTextBlock { + round_index: round.round_index, + content, + }) + } + }) + .collect() + } + + fn transcript_thinking_blocks(turn: &DialogTurnData) -> Vec { + turn.model_rounds + .iter() + .filter_map(|round| { + let content = round + .thinking_items + .iter() + .filter(|item| !item.is_subagent_item.unwrap_or(false)) + .map(|item| item.content.trim()) + .filter(|value| !value.is_empty()) + .collect::>() + .join("\n\n"); + if content.is_empty() { + None + } else { + Some(TranscriptTextBlock { + round_index: round.round_index, + content, + }) + } + }) + .collect() + } + + fn transcript_tool_blocks( + turn: &DialogTurnData, + tool_inputs: bool, + ) -> Vec { + turn.model_rounds + .iter() + .flat_map(|round| round.tool_items.iter()) + .filter(|item| !item.is_subagent_item.unwrap_or(false)) + .map(|item| TranscriptToolBlock { + tool_name: item.tool_name.clone(), + tool_input: Self::transcript_tool_input(item, tool_inputs), + result: Self::transcript_tool_result(item), + }) + .collect() + } + + fn transcript_round_blocks( + turn: &DialogTurnData, + options: &SessionTranscriptExportOptions, + ) -> Vec { + turn.model_rounds + .iter() + .filter_map(|round| { + let thinking_content = if options.thinking { + round + .thinking_items + .iter() + .filter(|item| !item.is_subagent_item.unwrap_or(false)) + .map(|item| item.content.trim()) + .filter(|value| !value.is_empty()) + .collect::>() + .join("\n\n") + } else { + String::new() + }; + + let assistant_content = round + .text_items + .iter() + .filter(|item| !item.is_subagent_item.unwrap_or(false)) + .map(|item| item.content.trim()) + .filter(|value| !value.is_empty()) + .collect::>() + .join("\n\n"); + + let tool_blocks = if options.tools { + round + .tool_items + .iter() + .filter(|item| !item.is_subagent_item.unwrap_or(false)) + .map(|item| TranscriptToolBlock { + tool_name: item.tool_name.clone(), + tool_input: Self::transcript_tool_input(item, options.tool_inputs), + result: Self::transcript_tool_result(item), + }) + .collect::>() + } else { + Vec::new() + }; + + if thinking_content.is_empty() + && assistant_content.is_empty() + && tool_blocks.is_empty() + { + return None; + } + + let mut blocks = Vec::new(); + if !thinking_content.is_empty() { + blocks.push(TranscriptRoundBlock::Thinking(thinking_content)); + } + if !assistant_content.is_empty() { + blocks.push(TranscriptRoundBlock::Assistant(assistant_content)); + } + for tool in tool_blocks { + blocks.push(TranscriptRoundBlock::Tool(tool)); + } + + Some(TranscriptRoundData { + round_index: round.round_index, + blocks, + }) + }) + .collect() + } + + fn transcript_fingerprint( + session_id: &str, + turns: &[DialogTurnData], + options: &SessionTranscriptExportOptions, + ) -> BitFunResult { + let payload = TranscriptFingerprintPayload { + session_id: session_id.to_string(), + tools: options.tools, + tool_inputs: options.tool_inputs, + thinking: options.thinking, + turn_selectors: options.turns.clone(), + turns: turns + .iter() + .map(|turn| TranscriptFingerprintTurn { + turn_id: turn.turn_id.clone(), + turn_index: turn.turn_index, + status: Self::turn_status_label(&turn.status).to_string(), + user: Self::transcript_display_user_content(turn), + assistant: Self::transcript_assistant_blocks(turn) + .into_iter() + .map(|block| TranscriptFingerprintTextBlock { + round_index: block.round_index, + content: block.content, + }) + .collect(), + tools: if options.tools { + Self::transcript_tool_blocks(turn, options.tool_inputs) + .into_iter() + .map(|tool| TranscriptFingerprintTool { + tool_name: tool.tool_name, + tool_input: tool.tool_input, + result: tool.result, + }) + .collect() + } else { + Vec::new() + }, + thinking: if options.thinking { + Self::transcript_thinking_blocks(turn) + .into_iter() + .map(|block| TranscriptFingerprintTextBlock { + round_index: block.round_index, + content: block.content, + }) + .collect() + } else { + Vec::new() + }, + }) + .collect(), + }; + + let bytes = serde_json::to_vec(&payload).map_err(|e| { + BitFunError::serialization(format!("Failed to serialize transcript fingerprint: {}", e)) + })?; + let mut hasher = Sha256::new(); + hasher.update(bytes); + Ok(format!("{:x}", hasher.finalize())) + } + + fn push_transcript_block( + lines: &mut Vec, + label: &str, + body_lines: Vec, + ) -> TranscriptLineRange { + let start_line = lines.len() + 1; + lines.push(format!("[{}]", label)); + lines.extend(body_lines); + lines.push(format!("[/{}]", label)); + TranscriptLineRange { + start_line, + end_line: lines.len(), + } + } + + fn build_transcript_section( + turn: &DialogTurnData, + options: &SessionTranscriptExportOptions, + ) -> TranscriptSectionData { + let user_content = Self::transcript_display_user_content(turn); + let round_blocks = Self::transcript_round_blocks(turn, options); + + let mut lines = Vec::new(); + lines.push(format!("## Turn {}", turn.turn_index)); + lines.push(String::new()); + + let user_range = Self::push_transcript_block( + &mut lines, + "user", + Self::transcript_text_lines(&user_content), + ); + + if !round_blocks.is_empty() { + lines.push(String::new()); + for (round_index, round) in round_blocks.iter().enumerate() { + lines.push(format!("[assistant_round {}]", round.round_index)); + for (block_index, block) in round.blocks.iter().enumerate() { + match block { + TranscriptRoundBlock::Thinking(content) => { + lines.push("[thinking]".to_string()); + lines.extend(Self::transcript_text_lines(content)); + lines.push("[/thinking]".to_string()); + } + TranscriptRoundBlock::Assistant(content) => { + lines.push("[text]".to_string()); + lines.extend(Self::transcript_text_lines(content)); + lines.push("[/text]".to_string()); + } + TranscriptRoundBlock::Tool(tool) => { + lines.push("[tool]".to_string()); + lines.push(format!("name: {}", tool.tool_name)); + if let Some(tool_input) = tool.tool_input.as_ref() { + lines.push("input:".to_string()); + lines.extend(Self::transcript_text_lines(tool_input)); + } + if let Some(result) = tool.result.as_ref() { + lines.push("result:".to_string()); + lines.extend(Self::transcript_text_lines(result)); + } + lines.push("[/tool]".to_string()); + } + } + + if block_index + 1 < round.blocks.len() { + lines.push(String::new()); + } + } + lines.push(format!("[/assistant_round {}]", round.round_index)); + if round_index + 1 < round_blocks.len() { + lines.push(String::new()); + } + } + } + + TranscriptSectionData { + turn_index: turn.turn_index, + preview: Self::transcript_preview(&user_content), + turn_range: TranscriptLineRange { + start_line: 1, + end_line: lines.len(), + }, + user_range, + lines, + } + } + + fn offset_range(range: &TranscriptLineRange, offset: usize) -> TranscriptLineRange { + TranscriptLineRange { + start_line: range.start_line + offset, + end_line: range.end_line + offset, + } + } + + fn format_range(range: &TranscriptLineRange) -> String { + format!("{}-{}", range.start_line, range.end_line) + } + + fn parse_transcript_turn_selectors( + selectors: &[String], + ) -> BitFunResult> { + if selectors.is_empty() { + return Err(BitFunError::Validation( + "turns cannot be an empty array".to_string(), + )); + } + + selectors + .iter() + .map(|selector| Self::parse_transcript_turn_selector(selector)) + .collect() + } + + fn parse_transcript_turn_selector( + selector: &str, + ) -> BitFunResult { + let normalized = selector.trim(); + if normalized.is_empty() { + return Err(BitFunError::Validation( + "turns cannot contain empty selectors".to_string(), + )); + } + + if normalized.matches(':').count() > 1 { + return Err(BitFunError::Validation(format!( + "Invalid turn selector '{}'. Use forms like ':20', '-20:', '10:30', or '15'.", + normalized + ))); + } + + let selector = if let Some((start, end)) = normalized.split_once(':') { + TranscriptTurnSelector::Slice { + start: if start.is_empty() { + None + } else { + Some(Self::parse_transcript_turn_value(start, normalized)?) + }, + end: if end.is_empty() { + None + } else { + Some(Self::parse_transcript_turn_value(end, normalized)?) + }, + } + } else { + TranscriptTurnSelector::Index(Self::parse_transcript_turn_value( + normalized, normalized, + )?) + }; + + Ok(ParsedTranscriptTurnSelector { + normalized: normalized.to_string(), + selector, + }) + } + + fn parse_transcript_turn_value(value: &str, selector: &str) -> BitFunResult { + value.parse::().map_err(|_| { + BitFunError::Validation(format!( + "Invalid turn selector '{}'. Use forms like ':20', '-20:', '10:30', or '15'.", + selector + )) + }) + } + + fn transcript_normalize_slice_bound( + total: usize, + bound: Option, + default: usize, + ) -> usize { + let Some(bound) = bound else { + return default; + }; + + let total = total as isize; + let normalized = if bound < 0 { + total.saturating_add(bound) + } else { + bound + }; + normalized.clamp(0, total) as usize + } + + fn transcript_normalize_index(total: usize, index: isize) -> Option { + let total = total as isize; + let normalized = if index < 0 { + total.saturating_add(index) + } else { + index + }; + + if normalized < 0 || normalized >= total { + None + } else { + Some(normalized as usize) + } + } + + fn transcript_select_turn_indices( + total: usize, + selectors: &[ParsedTranscriptTurnSelector], + ) -> Vec { + let mut selected = vec![false; total]; + + for selector in selectors { + match selector.selector { + TranscriptTurnSelector::Index(index) => { + if let Some(index) = Self::transcript_normalize_index(total, index) { + selected[index] = true; + } + } + TranscriptTurnSelector::Slice { start, end } => { + let start = Self::transcript_normalize_slice_bound(total, start, 0); + let end = Self::transcript_normalize_slice_bound(total, end, total); + if start < end { + selected[start..end].fill(true); + } + } + } + } + + selected + .into_iter() + .enumerate() + .filter_map(|(index, is_selected)| is_selected.then_some(index)) + .collect() + } + + fn transcript_omitted_turns_label( + turns: &[DialogTurnData], + start: usize, + end: usize, + ) -> String { + let start_turn = turns[start].turn_index; + let end_turn = turns[end].turn_index; + if start_turn == end_turn { + format!("(omitted turn {})", start_turn) + } else { + format!("(omitted turns {}-{})", start_turn, end_turn) + } + } + async fn rebuild_index_locked( &self, workspace_path: &Path, @@ -1105,6 +1747,198 @@ impl PersistenceManager { Ok(turns[start..].to_vec()) } + pub async fn export_session_transcript( + &self, + workspace_path: &Path, + session_id: &str, + options: &SessionTranscriptExportOptions, + ) -> BitFunResult { + if self + .load_session_metadata(workspace_path, session_id) + .await? + .is_none() + { + return Err(BitFunError::NotFound(format!( + "Session metadata not found: {}", + session_id + ))); + } + + let transcript_path = self.transcript_path(workspace_path, session_id); + let transcript_meta_path = self.transcript_meta_path(workspace_path, session_id); + + let parsed_turn_selectors = options + .turns + .as_ref() + .map(|selectors| Self::parse_transcript_turn_selectors(selectors)) + .transpose()?; + let normalized_options = SessionTranscriptExportOptions { + tools: options.tools, + tool_inputs: options.tool_inputs, + thinking: options.thinking, + turns: parsed_turn_selectors.as_ref().map(|selectors| { + selectors + .iter() + .map(|selector| selector.normalized.clone()) + .collect() + }), + }; + + let all_turns = self.load_session_turns(workspace_path, session_id).await?; + let selected_indices = parsed_turn_selectors + .as_ref() + .map(|selectors| Self::transcript_select_turn_indices(all_turns.len(), selectors)) + .unwrap_or_else(|| (0..all_turns.len()).collect::>()); + let turns = selected_indices + .iter() + .map(|&index| all_turns[index].clone()) + .collect::>(); + + let source_fingerprint = + Self::transcript_fingerprint(session_id, &turns, &normalized_options)?; + if transcript_path.exists() { + if let Some(stored) = self + .read_json_optional::(&transcript_meta_path) + .await? + { + if stored.transcript.source_fingerprint == source_fingerprint + && stored.transcript.index_range.start_line > 0 + && stored.transcript.index_range.end_line > 0 + { + return Ok(stored.transcript); + } + } + } + + self.ensure_artifacts_dir(workspace_path, session_id) + .await?; + + let generated_at = Self::system_time_to_unix_ms(SystemTime::now()); + let sections = selected_indices + .iter() + .map(|&index| { + ( + index, + Self::build_transcript_section(&all_turns[index], &normalized_options), + ) + }) + .collect::>(); + + let mut lines = vec!["## Index".to_string()]; + + let mut index = Vec::with_capacity(sections.len()); + if sections.is_empty() { + lines.push(if all_turns.is_empty() { + "(no persisted turns)".to_string() + } else { + "(no matching turns)".to_string() + }); + } else { + let index_offset = lines.len() + sections.len() + 1; + let mut body_lines = Vec::new(); + + for (position, (source_index, section)) in sections.iter().enumerate() { + let omitted_range = if position == 0 { + (*source_index > 0).then(|| (0, *source_index - 1)) + } else { + let previous_index = sections[position - 1].0; + (*source_index > previous_index + 1) + .then(|| (previous_index + 1, *source_index - 1)) + }; + + if let Some((start, end)) = omitted_range { + if !body_lines.is_empty() { + body_lines.push(String::new()); + } + body_lines.push(Self::transcript_omitted_turns_label(&all_turns, start, end)); + body_lines.push(String::new()); + } else if !body_lines.is_empty() { + body_lines.push(String::new()); + } + + let section_offset = index_offset + body_lines.len(); + let turn_range = Self::offset_range(§ion.turn_range, section_offset); + let user_range = Self::offset_range(§ion.user_range, section_offset); + + let index_line = format!( + "- turn={} range={} preview=\"{}\"", + section.turn_index, + Self::format_range(&turn_range), + section.preview.replace('"', "'") + ); + lines.push(index_line); + + index.push(SessionTranscriptIndexEntry { + turn_index: section.turn_index, + preview: section.preview.clone(), + turn_range, + user_range, + }); + + body_lines.extend(section.lines.iter().cloned()); + } + + if let Some((last_index, _)) = sections.last() { + if *last_index + 1 < all_turns.len() { + body_lines.push(String::new()); + body_lines.push(Self::transcript_omitted_turns_label( + &all_turns, + *last_index + 1, + all_turns.len() - 1, + )); + } + } + + lines.push(String::new()); + lines.extend(body_lines); + } + + let index_range = TranscriptLineRange { + start_line: 1, + end_line: lines + .iter() + .position(|line| line.is_empty()) + .unwrap_or(lines.len()), + }; + + let transcript_content = lines.join("\n"); + fs::write(&transcript_path, transcript_content) + .await + .map_err(|e| { + BitFunError::io(format!( + "Failed to write transcript file {}: {}", + transcript_path.display(), + e + )) + })?; + + let transcript = SessionTranscriptExport { + session_id: session_id.to_string(), + transcript_path: transcript_path.to_string_lossy().to_string(), + generated_at, + source_fingerprint, + includes_tools: normalized_options.tools, + includes_tool_inputs: normalized_options.tool_inputs, + includes_thinking: normalized_options.thinking, + turns: normalized_options.turns, + turn_count: turns.len(), + line_count: lines.len(), + index_range, + index, + }; + + self.write_json_atomic( + &transcript_meta_path, + &StoredSessionTranscriptFile { + schema_version: TRANSCRIPT_SCHEMA_VERSION, + transcript: transcript.clone(), + }, + ) + .await?; + + Ok(transcript) + } + pub async fn delete_turns_after( &self, workspace_path: &Path, @@ -1427,3 +2261,127 @@ impl PersistenceManager { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::PersistenceManager; + use crate::infrastructure::PathManager; + use crate::service::session::{ + DialogTurnData, SessionMetadata, SessionTranscriptExportOptions, UserMessageData, + }; + use std::path::{Path, PathBuf}; + use std::sync::Arc; + use uuid::Uuid; + + struct TestWorkspace { + path: PathBuf, + } + + impl TestWorkspace { + fn new() -> Self { + let path = std::env::temp_dir() + .join(format!("bitfun-session-transcript-test-{}", Uuid::new_v4())); + std::fs::create_dir_all(&path).expect("test workspace should be created"); + Self { path } + } + + fn path(&self) -> &Path { + &self.path + } + } + + impl Drop for TestWorkspace { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.path); + } + } + + #[test] + fn transcript_turn_selectors_support_head_and_tail_ranges() { + let selectors = PersistenceManager::parse_transcript_turn_selectors(&[ + ":1".to_string(), + "-3:".to_string(), + ]) + .expect("selectors should parse"); + + let selected = PersistenceManager::transcript_select_turn_indices(8, &selectors); + + assert_eq!(selected, vec![0, 5, 6, 7]); + } + + #[test] + fn transcript_turn_selectors_deduplicate_and_sort_results() { + let selectors = PersistenceManager::parse_transcript_turn_selectors(&[ + "4".to_string(), + "2:5".to_string(), + "-1".to_string(), + ]) + .expect("selectors should parse"); + + let selected = PersistenceManager::transcript_select_turn_indices(6, &selectors); + + assert_eq!(selected, vec![2, 3, 4, 5]); + } + + #[test] + fn transcript_turn_selectors_reject_invalid_syntax() { + let error = PersistenceManager::parse_transcript_turn_selectors(&["1:2:3".to_string()]) + .expect_err("selector should be rejected"); + + assert!( + error.to_string().contains("Invalid turn selector"), + "unexpected error: {}", + error + ); + } + + #[tokio::test] + async fn export_session_transcript_handles_first_selected_turn_without_panicking() { + let workspace = TestWorkspace::new(); + let manager = PersistenceManager::new(Arc::new(PathManager::new().expect("path manager"))) + .expect("persistence manager"); + let session_id = Uuid::new_v4().to_string(); + + let metadata = SessionMetadata::new( + session_id.clone(), + "Transcript test".to_string(), + "agent".to_string(), + "model".to_string(), + ); + manager + .save_session_metadata(workspace.path(), &metadata) + .await + .expect("metadata should save"); + + let user_message = UserMessageData { + id: "user-1".to_string(), + content: "hello transcript".to_string(), + timestamp: 0, + metadata: None, + }; + let mut turn = + DialogTurnData::new("turn-1".to_string(), 0, session_id.clone(), user_message); + turn.mark_completed(); + manager + .save_dialog_turn(workspace.path(), &turn) + .await + .expect("turn should save"); + + let export = manager + .export_session_transcript( + workspace.path(), + &session_id, + &SessionTranscriptExportOptions::default(), + ) + .await + .expect("transcript export should succeed"); + + assert_eq!(export.turn_count, 1); + assert_eq!(export.index.len(), 1); + + let transcript = std::fs::read_to_string(&export.transcript_path) + .expect("transcript file should be readable"); + assert!(transcript.contains("## Turn 0")); + assert!(transcript.contains("hello transcript")); + } +} diff --git a/src/crates/core/src/agentic/tools/implementations/bash_tool.rs b/src/crates/core/src/agentic/tools/implementations/bash_tool.rs index 882da94d..fb31a74d 100644 --- a/src/crates/core/src/agentic/tools/implementations/bash_tool.rs +++ b/src/crates/core/src/agentic/tools/implementations/bash_tool.rs @@ -195,6 +195,7 @@ Usage notes: - You can use the `run_in_background` parameter to run the command in a new dedicated background terminal session. The tool returns the background session ID immediately without waiting for the command to finish. Only use this for long-running processes (e.g., dev servers, watchers) where you don't need the output right away. You do not need to append '&' to the command. NOTE: `timeout_ms` is ignored when `run_in_background` is true. - Each result includes a `` tag identifying the terminal session. The persistent shell session ID remains constant throughout the entire conversation; background sessions each have their own unique ID. - The output may include the command echo and/or the shell prompt (e.g., `PS C:\path>`). Do not treat these as part of the command's actual result. + - Avoid interactive commands that may block waiting for user input or open a pager/editor. Prefer non-interactive variants and explicit flags. For example, use `git --no-pager diff` instead of `git diff`, and avoid commands that prompt for confirmation unless the User explicitly asks for them. - Avoid using this tool with the `find`, `grep`, `cat`, `head`, `tail`, `sed`, `awk`, or `echo` commands, unless explicitly instructed or when these commands are truly necessary for the task. Instead, always prefer using the dedicated tools for these commands: - File search: Use Glob (NOT find or ls) diff --git a/src/crates/core/src/agentic/tools/implementations/mod.rs b/src/crates/core/src/agentic/tools/implementations/mod.rs index 5761d80d..7101615e 100644 --- a/src/crates/core/src/agentic/tools/implementations/mod.rs +++ b/src/crates/core/src/agentic/tools/implementations/mod.rs @@ -20,6 +20,7 @@ pub mod mermaid_interactive_tool; pub mod miniapp_init_tool; pub mod session_control_tool; pub mod session_message_tool; +pub mod session_history_tool; pub mod skill_tool; pub mod skills; pub mod task_tool; @@ -49,6 +50,7 @@ pub use mermaid_interactive_tool::MermaidInteractiveTool; pub use miniapp_init_tool::InitMiniAppTool; pub use session_control_tool::SessionControlTool; pub use session_message_tool::SessionMessageTool; +pub use session_history_tool::SessionHistoryTool; pub use skill_tool::SkillTool; pub use task_tool::TaskTool; pub use terminal_control_tool::TerminalControlTool; diff --git a/src/crates/core/src/agentic/tools/implementations/session_control_tool.rs b/src/crates/core/src/agentic/tools/implementations/session_control_tool.rs index 85267f3c..d90d7571 100644 --- a/src/crates/core/src/agentic/tools/implementations/session_control_tool.rs +++ b/src/crates/core/src/agentic/tools/implementations/session_control_tool.rs @@ -19,6 +19,23 @@ impl SessionControlTool { Self } + fn current_workspace_session<'a>( + &self, + context: &'a ToolUseContext, + workspace: &str, + ) -> Option<&'a str> { + let current_session_id = context.session_id.as_deref()?; + let current_workspace = context.workspace_root()?; + let normalized_current_workspace = + normalize_path(¤t_workspace.to_string_lossy().to_string()); + + if normalized_current_workspace == workspace { + Some(current_session_id) + } else { + None + } + } + fn validate_session_id(session_id: &str) -> Result<(), String> { if session_id.is_empty() { return Err("session_id cannot be empty".to_string()); @@ -99,19 +116,24 @@ impl SessionControlTool { &self, workspace: &str, sessions: &[crate::agentic::core::SessionSummary], + current_session_id: Option<&str>, ) -> String { if sessions.is_empty() { return format!("No sessions found in workspace '{}'.", workspace); } let mut lines = vec![format!( - "Found {} session(s) in workspace '{}':", + "Found {} session(s) in workspace '{}'", sessions.len(), workspace )]; lines.push(String::new()); + if let Some(current_session_id) = current_session_id { + lines.push(format!("Note: '{}' is your session_id", current_session_id)); + lines.push(String::new()); + } lines.push( - "| Session ID | Session Name | Agent Type | Created At | Last Active At |".to_string(), + "| session_id | session_name | agent_type | created_at | last_active_at |".to_string(), ); lines.push("| --- | --- | --- | --- | --- |".to_string()); for session in sessions { @@ -317,6 +339,23 @@ Optional inputs: meta: None, }; } + if let Some(tool_context) = context { + if let Ok(workspace) = self.resolve_workspace(&parsed.workspace) { + if self.current_workspace_session(tool_context, &workspace) + == Some(session_id) + { + return ValidationResult { + result: false, + message: Some( + "cannot delete the current session from SessionControl" + .to_string(), + ), + error_code: Some(400), + meta: None, + }; + } + } + } } SessionControlAction::List => { if parsed.agent_type.is_some() { @@ -421,6 +460,11 @@ Optional inputs: BitFunError::tool("session_id is required for delete".to_string()) })?; Self::validate_session_id(session_id).map_err(BitFunError::tool)?; + if self.current_workspace_session(context, &workspace) == Some(session_id) { + return Err(BitFunError::tool( + "cannot delete the current session from SessionControl".to_string(), + )); + } let existing_sessions = coordinator.list_sessions(workspace_path).await?; if !existing_sessions @@ -452,14 +496,16 @@ Optional inputs: } SessionControlAction::List => { let sessions = coordinator.list_sessions(workspace_path).await?; + let current_session_id = self.current_workspace_session(context, &workspace); let result_for_assistant = - self.build_list_result_for_assistant(&workspace, &sessions); + self.build_list_result_for_assistant(&workspace, &sessions, current_session_id); Ok(vec![ToolResult::Result { data: json!({ "success": true, "action": "list", "workspace": workspace.clone(), + "current_session_id": current_session_id, "count": sessions.len(), "sessions": sessions, }), diff --git a/src/crates/core/src/agentic/tools/implementations/session_history_tool.rs b/src/crates/core/src/agentic/tools/implementations/session_history_tool.rs new file mode 100644 index 00000000..8883152c --- /dev/null +++ b/src/crates/core/src/agentic/tools/implementations/session_history_tool.rs @@ -0,0 +1,318 @@ +use super::util::normalize_path; +use crate::agentic::persistence::PersistenceManager; +use crate::agentic::tools::framework::{ + Tool, ToolRenderOptions, ToolResult, ToolUseContext, ValidationResult, +}; +use crate::infrastructure::PathManager; +use crate::service::session::SessionTranscriptExportOptions; +use crate::util::errors::{BitFunError, BitFunResult}; +use async_trait::async_trait; +use serde::Deserialize; +use serde_json::{json, Value}; +use std::path::Path; +use std::sync::Arc; + +/// SessionHistory tool - export a grep-friendly transcript file for a session. +pub struct SessionHistoryTool; + +impl SessionHistoryTool { + pub fn new() -> Self { + Self + } + + fn validate_session_id(session_id: &str) -> Result<(), String> { + if session_id.is_empty() { + return Err("session_id cannot be empty".to_string()); + } + if session_id == "." || session_id == ".." { + return Err("session_id cannot be '.' or '..'".to_string()); + } + if session_id.contains('/') || session_id.contains('\\') { + return Err("session_id cannot contain path separators".to_string()); + } + if !session_id + .chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_') + { + return Err( + "session_id can only contain ASCII letters, numbers, '-' and '_'".to_string(), + ); + } + Ok(()) + } + + fn resolve_workspace(&self, workspace: &str) -> BitFunResult { + let workspace = workspace.trim(); + if workspace.is_empty() { + return Err(BitFunError::tool( + "workspace is required and cannot be empty".to_string(), + )); + } + + let path = Path::new(workspace); + if !path.is_absolute() { + return Err(BitFunError::tool( + "workspace must be an absolute path".to_string(), + )); + } + + let resolved = normalize_path(workspace); + let path = Path::new(&resolved); + if !path.exists() { + return Err(BitFunError::tool(format!( + "Workspace does not exist: {}", + resolved + ))); + } + if !path.is_dir() { + return Err(BitFunError::tool(format!( + "Workspace is not a directory: {}", + resolved + ))); + } + + Ok(resolved) + } + + fn resolve_session_id(&self, session_id: &str) -> BitFunResult { + let session_id = session_id.trim().to_string(); + Self::validate_session_id(&session_id).map_err(BitFunError::tool)?; + Ok(session_id) + } +} + +#[derive(Debug, Clone, Deserialize)] +struct SessionHistoryInput { + workspace: String, + session_id: String, + #[serde(default)] + tools: Option, + #[serde(default)] + tool_inputs: Option, + #[serde(default)] + thinking: Option, + #[serde(default)] + turns: Option>, +} + +#[async_trait] +impl Tool for SessionHistoryTool { + fn name(&self) -> &str { + "SessionHistory" + } + + async fn description(&self) -> BitFunResult { + Ok( + r#"Use this tool when you need the history of an agent session. + +This tool does not return full details directly. Instead, it exports a transcript file. The result includes the transcript file path together with index location hints. + +The transcript file starts with a compact index. Each index entry includes the turn number, a short preview, and line ranges you can use for targeted reads. + +Recommended workflow: +1. Call this tool. +2. Read only the index line range from the returned transcript path first. +3. Inspect the on-file index header to find the turn you want. +4. Read only the matching `range`. + +Typical usage: +- To review session history across a workspace, first use `SessionControl` to list the sessions in that workspace, then call this tool for the sessions you want to inspect. +- To inspect the latest state of a specific session, call this tool with `turns=["-1:"]` to export only the last turn. + +Minimal transcript example: + +## Index +- turn=0 range=4-7 preview="Fix failing login test" + +## Turn 0 +[user] +Fix failing login test +[/user] + +In the example above, read lines `1-2` first, then jump directly to `range=4-7`. + +`turns` parameter: +- Optional list of turn selectors. +- Supports selectors such as `":1"`, `"-20:"`, `":1" + "-5:"`, and `"10:30"`. +- When omitted, exports all turns. + +Examples: +1. Export the full transcript: leave `turns` empty +2. Export the first turn only: `turns=[":1"]` +3. Export the last 3 turns: `turns=["-3:"]` +4. Export the first turn and the last 3 turns: `turns=[":1", "-3:"]` +5. Export a middle range: `turns=["2:5"]`"# + .to_string(), + ) + } + + fn input_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "workspace": { + "type": "string", + "description": "Required absolute workspace path." + }, + "session_id": { + "type": "string", + "description": "Required session ID to export." + }, + "tools": { + "type": "boolean", + "description": "Whether to include tool sections. Defaults to false." + }, + "tool_inputs": { + "type": "boolean", + "description": "Whether to include tool input parameters in tool sections. Defaults to false. Only applies when tools is true." + }, + "thinking": { + "type": "boolean", + "description": "Whether to include thinking sections. Defaults to false." + }, + "turns": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Optional list of turn selectors. Supports index and start:end forms such as \":1\", \"-20:\", \"10:30\", or \"15\"." + } + }, + "required": ["workspace", "session_id"], + "additionalProperties": false + }) + } + + fn is_readonly(&self) -> bool { + true + } + + fn needs_permissions(&self, _input: Option<&Value>) -> bool { + false + } + + async fn validate_input( + &self, + input: &Value, + _context: Option<&ToolUseContext>, + ) -> ValidationResult { + let parsed: SessionHistoryInput = match serde_json::from_value(input.clone()) { + Ok(value) => value, + Err(err) => { + return ValidationResult { + result: false, + message: Some(format!("Invalid input: {}", err)), + error_code: Some(400), + meta: None, + }; + } + }; + + if parsed.workspace.trim().is_empty() { + return ValidationResult { + result: false, + message: Some("workspace is required and cannot be empty".to_string()), + error_code: Some(400), + meta: None, + }; + } + + if !Path::new(parsed.workspace.trim()).is_absolute() { + return ValidationResult { + result: false, + message: Some("workspace must be an absolute path".to_string()), + error_code: Some(400), + meta: None, + }; + } + + if parsed.session_id.trim().is_empty() { + return ValidationResult { + result: false, + message: Some("session_id cannot be empty".to_string()), + error_code: Some(400), + meta: None, + }; + } + + if let Err(message) = Self::validate_session_id(parsed.session_id.trim()) { + return ValidationResult { + result: false, + message: Some(message), + error_code: Some(400), + meta: None, + }; + } + + if parsed.turns.as_ref().is_some_and(|selectors| selectors.is_empty()) { + return ValidationResult { + result: false, + message: Some("turns cannot be an empty array".to_string()), + error_code: Some(400), + meta: None, + }; + } + + if parsed + .turns + .as_ref() + .is_some_and(|selectors| selectors.iter().any(|selector| selector.trim().is_empty())) + { + return ValidationResult { + result: false, + message: Some("turns cannot contain empty selectors".to_string()), + error_code: Some(400), + meta: None, + }; + } + + ValidationResult::default() + } + + fn render_tool_use_message(&self, input: &Value, _options: &ToolRenderOptions) -> String { + let session_id = input + .get("session_id") + .and_then(|value| value.as_str()) + .unwrap_or("unknown session"); + format!("Export transcript for {}", session_id) + } + + async fn call_impl( + &self, + input: &Value, + _context: &ToolUseContext, + ) -> BitFunResult> { + let params: SessionHistoryInput = serde_json::from_value(input.clone()) + .map_err(|e| BitFunError::tool(format!("Invalid input: {}", e)))?; + + let workspace = self.resolve_workspace(¶ms.workspace)?; + let session_id = self.resolve_session_id(¶ms.session_id)?; + let manager = PersistenceManager::new(Arc::new(PathManager::new()?))?; + let transcript = manager + .export_session_transcript( + Path::new(&workspace), + &session_id, + &SessionTranscriptExportOptions { + tools: params.tools.unwrap_or(false), + tool_inputs: params.tool_inputs.unwrap_or(false), + thinking: params.thinking.unwrap_or(false), + turns: params.turns, + }, + ) + .await?; + + Ok(vec![ToolResult::Result { + data: json!({ + "success": true, + "workspace": workspace, + "transcript": transcript, + }), + result_for_assistant: Some(format!( + "Transcript exported to '{}'. The index is on lines {}-{}. Read that range first, then use Grep or Read on that path for targeted navigation.", + transcript.transcript_path, + transcript.index_range.start_line, + transcript.index_range.end_line + )), + }]) + } +} diff --git a/src/crates/core/src/agentic/tools/pipeline/state_manager.rs b/src/crates/core/src/agentic/tools/pipeline/state_manager.rs index dfc61445..d0c2f5bf 100644 --- a/src/crates/core/src/agentic/tools/pipeline/state_manager.rs +++ b/src/crates/core/src/agentic/tools/pipeline/state_manager.rs @@ -189,6 +189,13 @@ impl ToolStateManager { tool_id: task.tool_call.tool_id.clone(), tool_name: task.tool_call.tool_name.clone(), result: Self::sanitize_tool_result_for_event(&result.content()), + result_for_assistant: match result { + crate::agentic::tools::framework::ToolResult::Result { + result_for_assistant, + .. + } => result_for_assistant.clone(), + _ => None, + }, duration_ms: *duration_ms, }, diff --git a/src/crates/core/src/agentic/tools/registry.rs b/src/crates/core/src/agentic/tools/registry.rs index 300d5ceb..0be9ec23 100644 --- a/src/crates/core/src/agentic/tools/registry.rs +++ b/src/crates/core/src/agentic/tools/registry.rs @@ -92,6 +92,7 @@ impl ToolRegistry { self.register_tool(Arc::new(TerminalControlTool::new())); self.register_tool(Arc::new(SessionControlTool::new())); self.register_tool(Arc::new(SessionMessageTool::new())); + self.register_tool(Arc::new(SessionHistoryTool::new())); // TodoWrite tool self.register_tool(Arc::new(TodoWriteTool::new())); diff --git a/src/crates/core/src/service/agent_memory/agent_memory.rs b/src/crates/core/src/service/agent_memory/agent_memory.rs index ba02df70..e5a47953 100644 --- a/src/crates/core/src/service/agent_memory/agent_memory.rs +++ b/src/crates/core/src/service/agent_memory/agent_memory.rs @@ -1,11 +1,6 @@ -use crate::agentic::core::{strip_prompt_markup, SessionSummary}; -use crate::agentic::persistence::PersistenceManager; -use crate::infrastructure::try_get_path_manager_arc; -use crate::service::session::DialogTurnData; use crate::util::errors::*; -use log::{debug, warn}; +use log::debug; use std::path::{Path, PathBuf}; -use std::time::SystemTime; use tokio::fs; const MEMORY_DIR_NAME: &str = "memory"; @@ -15,8 +10,6 @@ const MEMORY_INDEX_TEMPLATE: &str = "# Memory Index\n"; const MEMORY_INDEX_MAX_LINES: usize = 200; const DAILY_MEMORY_MAX_FILES: usize = 30; const TOPIC_MEMORY_MAX_FILES: usize = 30; -const RECENT_WORKSPACE_SESSIONS_MAX_COUNT: usize = 5; -const SESSION_MESSAGE_PREVIEW_CHAR_LIMIT: usize = 100; fn memory_dir_path(workspace_root: &Path) -> PathBuf { workspace_root.join(BITFUN_DIR_NAME).join(MEMORY_DIR_NAME) @@ -97,143 +90,6 @@ async fn list_memory_file_groups(memory_dir: &Path) -> BitFunResult<(Vec Ok((daily_files, topic_files)) } -fn format_system_time_for_prompt(time: SystemTime) -> String { - let datetime: chrono::DateTime = time.into(); - datetime.format("%Y-%m-%dT%H:%M:%S").to_string() -} - -fn normalize_message_preview(content: &str) -> String { - let normalized = content.split_whitespace().collect::>().join(" "); - if normalized.is_empty() { - return "(empty user message)".to_string(); - } - - normalized - .chars() - .take(SESSION_MESSAGE_PREVIEW_CHAR_LIMIT) - .collect() -} - -fn escape_prompt_text(value: &str) -> String { - value - .replace('&', "&") - .replace('<', "<") - .replace('>', ">") - .replace('"', """) -} - -fn build_turn_preview_value(turn: Option<&DialogTurnData>) -> String { - // Session previews should reflect the user's visible text, not injected prompt markup. - turn.map(|turn| normalize_message_preview(&strip_prompt_markup(&turn.user_message.content))) - .unwrap_or_else(|| "(no saved user messages)".to_string()) -} - -fn build_session_preview_tags(turns: &[DialogTurnData]) -> String { - match turns { - [] => "(no saved user messages)".to_string(), - [turn] => format!( - "{}", - escape_prompt_text(&build_turn_preview_value(Some(turn))) - ), - _ => format!( - "{}\n{}", - escape_prompt_text(&build_turn_preview_value(turns.first())), - escape_prompt_text(&build_turn_preview_value(turns.last())), - ), - } -} - -async fn build_recent_workspace_sessions_content( - workspace_root: &Path, - current_session_id: Option<&str>, -) -> String { - let path_manager = match try_get_path_manager_arc() { - Ok(path_manager) => path_manager, - Err(err) => { - warn!( - "Failed to initialize PathManager for workspace session prompt section: {}", - err - ); - return "(workspace session data unavailable)".to_string(); - } - }; - - let persistence_manager = match PersistenceManager::new(path_manager) { - Ok(manager) => manager, - Err(err) => { - warn!( - "Failed to initialize PersistenceManager for workspace session prompt section: {}", - err - ); - return "(workspace session data unavailable)".to_string(); - } - }; - - let recent_sessions = match persistence_manager.list_sessions(workspace_root).await { - Ok(sessions) => sessions - .into_iter() - .filter(|session| Some(session.session_id.as_str()) != current_session_id) - .take(RECENT_WORKSPACE_SESSIONS_MAX_COUNT) - .collect::>(), - Err(err) => { - warn!( - "Failed to load workspace sessions for agent memory prompt: workspace={} error={}", - workspace_root.display(), - err - ); - return "(workspace session data unavailable)".to_string(); - } - }; - - if recent_sessions.is_empty() { - return "(no other recent workspace sessions found)".to_string(); - } - - let mut entries = Vec::with_capacity(recent_sessions.len()); - for session in recent_sessions { - entries.push( - build_recent_workspace_session_entry(&persistence_manager, workspace_root, &session) - .await, - ); - } - - entries.join("\n\n") -} - -async fn build_recent_workspace_session_entry( - persistence_manager: &PersistenceManager, - workspace_root: &Path, - session: &SessionSummary, -) -> String { - let turns = match persistence_manager - .load_session_turns(workspace_root, &session.session_id) - .await - { - Ok(turns) => turns, - Err(err) => { - warn!( - "Failed to load session turns for agent memory prompt: session_id={} error={}", - session.session_id, err - ); - Vec::new() - } - }; - - let preview_tags = build_session_preview_tags(&turns); - - format!( - r#" -{} -"#, - escape_prompt_text(&session.session_id), - escape_prompt_text(&session.session_name), - format_system_time_for_prompt(session.created_at), - format_system_time_for_prompt(session.last_activity_at), - session.turn_count, - preview_tags, - ) -} - pub(crate) async fn ensure_workspace_memory_files_for_prompt( workspace_root: &Path, ) -> BitFunResult<()> { @@ -262,7 +118,6 @@ pub(crate) async fn ensure_workspace_memory_files_for_prompt( pub(crate) async fn build_workspace_agent_memory_prompt( workspace_root: &Path, - current_session_id: Option<&str>, ) -> BitFunResult { ensure_workspace_memory_files_for_prompt(workspace_root).await?; @@ -277,18 +132,17 @@ You have access to a workspace memory space under `{memory_dir_display}`. Use it to preserve continuity across conversations. Save only information that is likely to help in future turns: durable preferences, project constraints, important decisions, ongoing plans, and meaningful outcomes. Do not save trivial chatter or temporary details. -## How to use memory -- Read: use Grep/Read to search and retrieve memories when past preferences, decisions, constraints, or ongoing work may matter, especially at the start of a new task, before making decisions, or when the user refers to prior plans or preferences. -- Write: use Edit/Write to create or update memory files when something should survive beyond the current turn. +## Memory usage +Use Grep/Read to search and retrieve memories before you start acting on a task, or when the user mentions facts, preferences, decisions, or plans that are not present in the current context and memory may fill the gap. -Write especially for: +## Memory update +Use Edit/Write to create or update memory files when something should survive beyond the current turn. Especially for: - stable user preferences - project constraints or conventions - important decisions - progress, plans, or handoff context - knowledge a future agent should not need to rediscover -Heuristic: if you expect to want this in a future session, save a short note. -Write once the information is clear enough to be useful. Prefer natural pauses or completed work; do not wait for a formal session end. +Heuristic: if you expect to want this in a future session, save a short note. Remember to update memory when you complete a task. ## File roles - `memory.md`: the concise index. Link to important memory files with short summaries, not full details. Use it as a map, not the place for the full facts. @@ -364,9 +218,6 @@ The following sections describe the memory files currently available in this wor .join("\n") }; - let recent_workspace_sessions_content = - build_recent_workspace_sessions_content(workspace_root, current_session_id).await; - section.push_str(&format!( r#" @@ -381,12 +232,9 @@ The following sections describe the memory files currently available in this wor {topic_files_content} -## Recent Sessions Preview -Structured summaries for up to 5 recent sessions from this workspace, excluding the current session. User message previews are single-line and truncated to the first {SESSION_MESSAGE_PREVIEW_CHAR_LIMIT} characters. +## Recent Sessions - -{recent_workspace_sessions_content} - +If you need the most detailed conversation history, first use SessionControl to list sessions in the current workspace, then use SessionHistory to retrieve the conversation history for the session you want. "# )); diff --git a/src/crates/core/src/service/session/types.rs b/src/crates/core/src/service/session/types.rs index 0e2450a4..cb8ef23b 100644 --- a/src/crates/core/src/service/session/types.rs +++ b/src/crates/core/src/service/session/types.rs @@ -312,14 +312,97 @@ pub struct ToolCallData { pub struct ToolResultData { pub result: serde_json::Value, pub success: bool, + #[serde( + skip_serializing_if = "Option::is_none", + alias = "result_for_assistant" + )] + pub result_for_assistant: Option, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, #[serde(skip_serializing_if = "Option::is_none", alias = "duration_ms")] pub duration_ms: Option, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct TranscriptLineRange { + pub start_line: usize, + pub end_line: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct SessionTranscriptIndexEntry { + #[serde(alias = "turn_index")] + pub turn_index: usize, + pub preview: String, + #[serde(alias = "turn_range")] + pub turn_range: TranscriptLineRange, + #[serde(alias = "user_range")] + pub user_range: TranscriptLineRange, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct SessionTranscriptExportOptions { + #[serde(default)] + pub tools: bool, + #[serde(default)] + pub tool_inputs: bool, + #[serde(default)] + pub thinking: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub turns: Option>, +} + +impl Default for SessionTranscriptExportOptions { + fn default() -> Self { + Self { + tools: false, + tool_inputs: false, + thinking: false, + turns: None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct SessionTranscriptExport { + #[serde(alias = "session_id")] + pub session_id: String, + #[serde(alias = "transcript_path")] + pub transcript_path: String, + #[serde(alias = "generated_at")] + pub generated_at: u64, + #[serde(alias = "source_fingerprint")] + pub source_fingerprint: String, + #[serde(alias = "includes_tools")] + pub includes_tools: bool, + #[serde(default, alias = "includes_tool_inputs")] + pub includes_tool_inputs: bool, + #[serde(alias = "includes_thinking")] + pub includes_thinking: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub turns: Option>, + #[serde(alias = "turn_count")] + pub turn_count: usize, + #[serde(alias = "line_count")] + pub line_count: usize, + #[serde(default = "default_transcript_line_range", alias = "index_range")] + pub index_range: TranscriptLineRange, + pub index: Vec, +} + +fn default_transcript_line_range() -> TranscriptLineRange { + TranscriptLineRange { + start_line: 0, + end_line: 0, + } +} + /// Turn status -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum TurnStatus { InProgress, diff --git a/src/crates/events/src/agentic.rs b/src/crates/events/src/agentic.rs index dc607807..9735e8fc 100644 --- a/src/crates/events/src/agentic.rs +++ b/src/crates/events/src/agentic.rs @@ -243,6 +243,8 @@ pub enum ToolEventData { tool_id: String, tool_name: String, result: serde_json::Value, + #[serde(skip_serializing_if = "Option::is_none")] + result_for_assistant: Option, duration_ms: u64, }, Failed { diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts index 50f2d71f..ebd107ff 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts @@ -269,6 +269,7 @@ function handleCompleted( toolResult: { result: toolEvent.result, success: true, + resultForAssistant: toolEvent.result_for_assistant, duration_ms: toolEvent.duration_ms }, status: 'completed', diff --git a/src/web-ui/src/flow_chat/types/flow-chat.ts b/src/web-ui/src/flow_chat/types/flow-chat.ts index d9dbe846..17be83d9 100644 --- a/src/web-ui/src/flow_chat/types/flow-chat.ts +++ b/src/web-ui/src/flow_chat/types/flow-chat.ts @@ -42,6 +42,7 @@ export interface FlowToolItem extends FlowItem { toolResult?: { result: any; success: boolean; + resultForAssistant?: string; error?: string; duration_ms?: number; }; diff --git a/src/web-ui/src/shared/types/session-history.ts b/src/web-ui/src/shared/types/session-history.ts index 39c6f8b7..31c4efee 100644 --- a/src/web-ui/src/shared/types/session-history.ts +++ b/src/web-ui/src/shared/types/session-history.ts @@ -110,6 +110,7 @@ export interface ToolCallData { export interface ToolResultData { result: any; success: boolean; + resultForAssistant?: string; error?: string; durationMs?: number; } diff --git a/src/web-ui/src/shared/types/tool-events.ts b/src/web-ui/src/shared/types/tool-events.ts index dc7df450..bdd7e66d 100644 --- a/src/web-ui/src/shared/types/tool-events.ts +++ b/src/web-ui/src/shared/types/tool-events.ts @@ -36,6 +36,7 @@ export interface ToolStartEvent extends BaseToolEvent { export interface ToolCompleteEvent extends BaseToolEvent { type: 'tool_complete'; result: any; + result_for_assistant?: string; duration_ms: number; success: boolean; error?: string; From 4879961f26d02ad0411a35e61d52b289c46acfaa Mon Sep 17 00:00:00 2001 From: wsp Date: Tue, 17 Mar 2026 19:20:22 +0800 Subject: [PATCH 2/3] feat(cron): add cron service and ScheduledJobsDialog --- Cargo.toml | 2 + src/apps/desktop/src/api/cron_api.rs | 93 ++ src/apps/desktop/src/api/mod.rs | 1 + src/apps/desktop/src/lib.rs | 17 + src/crates/core/Cargo.toml | 2 + .../src/agentic/coordination/coordinator.rs | 1 + .../src/agentic/coordination/scheduler.rs | 1 + .../infrastructure/filesystem/path_manager.rs | 11 + src/crates/core/src/service/cron/mod.rs | 14 + src/crates/core/src/service/cron/schedule.rs | 235 +++++ src/crates/core/src/service/cron/service.rs | 664 +++++++++++++ src/crates/core/src/service/cron/store.rs | 61 ++ .../core/src/service/cron/subscriber.rs | 50 + src/crates/core/src/service/cron/types.rs | 139 +++ src/crates/core/src/service/mod.rs | 4 + .../src/app/components/NavPanel/MainNav.tsx | 104 +- .../ScheduledJobsDialog.scss | 330 +++++++ .../ScheduledJobsDialog.tsx | 890 ++++++++++++++++++ .../ConfirmDialog/ConfirmDialog.scss | 4 + .../ConfirmDialog/ConfirmDialog.tsx | 7 +- src/web-ui/src/infrastructure/api/index.ts | 5 +- .../infrastructure/api/service-api/CronAPI.ts | 120 +++ .../src/infrastructure/i18n/hooks/useI18n.ts | 11 +- src/web-ui/src/locales/en-US/common.json | 87 ++ src/web-ui/src/locales/zh-CN/common.json | 87 ++ 25 files changed, 2923 insertions(+), 17 deletions(-) create mode 100644 src/apps/desktop/src/api/cron_api.rs create mode 100644 src/crates/core/src/service/cron/mod.rs create mode 100644 src/crates/core/src/service/cron/schedule.rs create mode 100644 src/crates/core/src/service/cron/service.rs create mode 100644 src/crates/core/src/service/cron/store.rs create mode 100644 src/crates/core/src/service/cron/subscriber.rs create mode 100644 src/crates/core/src/service/cron/types.rs create mode 100644 src/web-ui/src/app/components/ScheduledJobsDialog/ScheduledJobsDialog.scss create mode 100644 src/web-ui/src/app/components/ScheduledJobsDialog/ScheduledJobsDialog.tsx create mode 100644 src/web-ui/src/infrastructure/api/service-api/CronAPI.ts diff --git a/Cargo.toml b/Cargo.toml index 16179296..f249c28c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,8 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } # Utilities uuid = { version = "1.0", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde", "clock"] } +chrono-tz = "0.10.4" +cron = "0.15.0" regex = "1" base64 = "0.22" image = { version = "0.25", default-features = false, features = ["png", "jpeg", "gif", "webp", "bmp"] } diff --git a/src/apps/desktop/src/api/cron_api.rs b/src/apps/desktop/src/api/cron_api.rs new file mode 100644 index 00000000..4e086ba5 --- /dev/null +++ b/src/apps/desktop/src/api/cron_api.rs @@ -0,0 +1,93 @@ +//! Scheduled jobs API. + +use bitfun_core::service::cron::{ + get_global_cron_service, CreateCronJobRequest, CronJob, UpdateCronJobRequest, +}; +use log::{debug, error}; +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListCronJobsRequest { + pub workspace_path: Option, + pub session_id: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateCronJobCommandRequest { + pub job_id: String, + #[serde(flatten)] + pub changes: UpdateCronJobRequest, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct DeleteCronJobRequest { + pub job_id: String, +} + +fn cron_service() -> Result, String> { + get_global_cron_service().ok_or_else(|| "Cron service is not initialized".to_string()) +} + +#[tauri::command] +pub async fn list_cron_jobs(request: ListCronJobsRequest) -> Result, String> { + debug!( + "Listing scheduled jobs: workspace_path={:?}, session_id={:?}", + request.workspace_path, request.session_id + ); + + let service = cron_service()?; + Ok(service + .list_jobs_filtered( + request.workspace_path.as_deref(), + request.session_id.as_deref(), + ) + .await) +} + +#[tauri::command] +pub async fn create_cron_job(request: CreateCronJobRequest) -> Result { + debug!( + "Creating scheduled job: name={}, session_id={}, workspace_path={}", + request.name, request.session_id, request.workspace_path + ); + + let service = cron_service()?; + service.create_job(request).await.map_err(|error| { + error!("Failed to create scheduled job: {}", error); + format!("Failed to create scheduled job: {}", error) + }) +} + +#[tauri::command] +pub async fn update_cron_job(request: UpdateCronJobCommandRequest) -> Result { + debug!("Updating scheduled job: job_id={}", request.job_id); + + let service = cron_service()?; + service + .update_job(&request.job_id, request.changes) + .await + .map_err(|error| { + error!( + "Failed to update scheduled job {}: {}", + request.job_id, error + ); + format!("Failed to update scheduled job: {}", error) + }) +} + +#[tauri::command] +pub async fn delete_cron_job(request: DeleteCronJobRequest) -> Result { + debug!("Deleting scheduled job: job_id={}", request.job_id); + + let service = cron_service()?; + service.delete_job(&request.job_id).await.map_err(|error| { + error!( + "Failed to delete scheduled job {}: {}", + request.job_id, error + ); + format!("Failed to delete scheduled job: {}", error) + }) +} diff --git a/src/apps/desktop/src/api/mod.rs b/src/apps/desktop/src/api/mod.rs index 84efbc1c..77d7a8fc 100644 --- a/src/apps/desktop/src/api/mod.rs +++ b/src/apps/desktop/src/api/mod.rs @@ -10,6 +10,7 @@ pub mod clipboard_file_api; pub mod commands; pub mod config_api; pub mod context_upload_api; +pub mod cron_api; pub mod diff_api; pub mod dto; pub mod git_agent_api; diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index 0a85fbbc..196ae708 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -26,6 +26,7 @@ use api::ai_rules_api::*; use api::clipboard_file_api::*; use api::commands::*; use api::config_api::*; +use api::cron_api::*; use api::diff_api::*; use api::git_agent_api::*; use api::git_api::*; @@ -563,6 +564,10 @@ pub async fn run() { reorder_opened_workspaces, get_current_workspace, scan_workspace_info, + list_cron_jobs, + create_cron_job, + update_cron_job, + delete_cron_job, api::prompt_template_api::get_prompt_template_config, api::prompt_template_api::save_prompt_template_config, api::prompt_template_api::export_prompt_templates, @@ -743,6 +748,18 @@ async fn init_agentic_system() -> anyhow::Result<( coordinator.set_scheduler_notifier(scheduler.outcome_sender()); coordination::set_global_scheduler(scheduler.clone()); + let cron_service = + bitfun_core::service::cron::CronService::new(path_manager.clone(), scheduler.clone()) + .await + .map_err(|e| anyhow::anyhow!("Failed to initialize cron service: {}", e))?; + bitfun_core::service::cron::set_global_cron_service(cron_service.clone()); + let cron_subscriber = Arc::new(bitfun_core::service::cron::CronEventSubscriber::new( + cron_service.clone(), + )); + event_router.subscribe_internal("cron_jobs".to_string(), cron_subscriber); + cron_service.start(); + + log::info!("Cron service initialized and subscriber registered"); log::info!("Agentic system initialized"); Ok(( coordinator, diff --git a/src/crates/core/Cargo.toml b/src/crates/core/Cargo.toml index bc3d0994..f2a56564 100644 --- a/src/crates/core/Cargo.toml +++ b/src/crates/core/Cargo.toml @@ -28,6 +28,8 @@ log = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } +chrono-tz = { workspace = true } +cron = { workspace = true } regex = { workspace = true } base64 = { workspace = true } image = { workspace = true } diff --git a/src/crates/core/src/agentic/coordination/coordinator.rs b/src/crates/core/src/agentic/coordination/coordinator.rs index f692b403..5b5ca640 100644 --- a/src/crates/core/src/agentic/coordination/coordinator.rs +++ b/src/crates/core/src/agentic/coordination/coordinator.rs @@ -39,6 +39,7 @@ pub enum DialogTriggerSource { DesktopUi, DesktopApi, AgentSession, + ScheduledJob, RemoteRelay, Bot, Cli, diff --git a/src/crates/core/src/agentic/coordination/scheduler.rs b/src/crates/core/src/agentic/coordination/scheduler.rs index 188544c7..71631529 100644 --- a/src/crates/core/src/agentic/coordination/scheduler.rs +++ b/src/crates/core/src/agentic/coordination/scheduler.rs @@ -54,6 +54,7 @@ impl DialogSubmissionPolicy { pub const fn for_source(trigger_source: DialogTriggerSource) -> Self { let (queue_priority, skip_tool_confirmation) = match trigger_source { DialogTriggerSource::AgentSession => (DialogQueuePriority::Low, true), + DialogTriggerSource::ScheduledJob => (DialogQueuePriority::Low, true), DialogTriggerSource::DesktopUi | DialogTriggerSource::DesktopApi | DialogTriggerSource::Cli => (DialogQueuePriority::Normal, false), diff --git a/src/crates/core/src/infrastructure/filesystem/path_manager.rs b/src/crates/core/src/infrastructure/filesystem/path_manager.rs index 2c9f47db..a687d434 100644 --- a/src/crates/core/src/infrastructure/filesystem/path_manager.rs +++ b/src/crates/core/src/infrastructure/filesystem/path_manager.rs @@ -214,6 +214,16 @@ impl PathManager { self.user_root.join("data") } + /// Get scheduled jobs directory: ~/.config/bitfun/data/cron/ + pub fn user_cron_dir(&self) -> PathBuf { + self.user_data_dir().join("cron") + } + + /// Get scheduled jobs persistence file: ~/.config/bitfun/data/cron/jobs.json + pub fn cron_jobs_file(&self) -> PathBuf { + self.user_cron_dir().join("jobs.json") + } + /// Get miniapps root directory: ~/.config/bitfun/data/miniapps/ pub fn miniapps_dir(&self) -> PathBuf { self.user_data_dir().join("miniapps") @@ -374,6 +384,7 @@ impl PathManager { self.cache_dir(CacheType::Git), self.cache_dir(CacheType::Index), self.user_data_dir(), + self.user_cron_dir(), self.user_rules_dir(), self.history_dir(), self.snippets_dir(), diff --git a/src/crates/core/src/service/cron/mod.rs b/src/crates/core/src/service/cron/mod.rs new file mode 100644 index 00000000..000ea19f --- /dev/null +++ b/src/crates/core/src/service/cron/mod.rs @@ -0,0 +1,14 @@ +//! Scheduled job service. + +mod schedule; +mod service; +mod store; +mod subscriber; +mod types; + +pub use service::{get_global_cron_service, set_global_cron_service, CronService}; +pub use subscriber::CronEventSubscriber; +pub use types::{ + CreateCronJobRequest, CronJob, CronJobPayload, CronJobRunStatus, CronJobState, CronJobsFile, + CronSchedule, UpdateCronJobRequest, CRON_JOBS_VERSION, DEFAULT_RETRY_DELAY_MS, +}; diff --git a/src/crates/core/src/service/cron/schedule.rs b/src/crates/core/src/service/cron/schedule.rs new file mode 100644 index 00000000..d5ddd979 --- /dev/null +++ b/src/crates/core/src/service/cron/schedule.rs @@ -0,0 +1,235 @@ +//! Schedule calculation helpers. + +use super::types::{CronJob, CronSchedule}; +use crate::util::errors::{BitFunError, BitFunResult}; +use chrono::{DateTime, Local, TimeZone, Utc}; +use chrono_tz::Tz; +use cron::Schedule; +use std::str::FromStr; + +pub fn validate_schedule(schedule: &CronSchedule, created_at_ms: i64) -> BitFunResult<()> { + let _ = compute_next_run_after_ms(schedule, created_at_ms, created_at_ms - 1)?; + Ok(()) +} + +pub fn compute_initial_next_run_at_ms(job: &CronJob, now_ms: i64) -> BitFunResult> { + match &job.schedule { + CronSchedule::At { .. } => { + if job.state.last_enqueued_at_ms.is_some() || job.state.active_turn_id.is_some() { + return Ok(None); + } + + parse_at_timestamp_ms(&job.schedule).map(Some) + } + _ => compute_next_run_after_ms(&job.schedule, job.created_at_ms, now_ms), + } +} + +pub fn compute_next_run_after_ms( + schedule: &CronSchedule, + created_at_ms: i64, + after_ms: i64, +) -> BitFunResult> { + match schedule { + CronSchedule::At { .. } => { + let at_ms = parse_at_timestamp_ms(schedule)?; + if at_ms > after_ms { + Ok(Some(at_ms)) + } else { + Ok(None) + } + } + CronSchedule::Every { + every_ms, + anchor_ms, + } => compute_every_next_run_ms(*every_ms, anchor_ms.unwrap_or(created_at_ms), after_ms) + .map(Some), + CronSchedule::Cron { expr, tz } => compute_cron_next_run_ms(expr, tz.as_deref(), after_ms) + .map(Some), + } +} + +pub fn parse_at_timestamp_ms(schedule: &CronSchedule) -> BitFunResult { + let CronSchedule::At { at } = schedule else { + return Err(BitFunError::validation( + "parse_at_timestamp_ms requires an 'at' schedule", + )); + }; + + let parsed = DateTime::parse_from_rfc3339(at).map_err(|error| { + BitFunError::validation(format!("Invalid ISO-8601 timestamp '{}': {}", at, error)) + })?; + Ok(parsed.timestamp_millis()) +} + +fn compute_every_next_run_ms(every_ms: u64, anchor_ms: i64, after_ms: i64) -> BitFunResult { + if every_ms == 0 { + return Err(BitFunError::validation( + "Recurring schedule everyMs must be greater than 0", + )); + } + + if anchor_ms > after_ms { + return Ok(anchor_ms); + } + + let interval = i128::from(every_ms); + let anchor = i128::from(anchor_ms); + let after = i128::from(after_ms); + let steps = ((after - anchor) / interval) + 1; + let next = anchor + (steps * interval); + + i64::try_from(next).map_err(|_| { + BitFunError::service("Recurring schedule next run timestamp overflowed i64") + }) +} + +fn compute_cron_next_run_ms(expr: &str, tz: Option<&str>, after_ms: i64) -> BitFunResult { + let normalized_expr = normalize_cron_expr(expr)?; + let schedule = Schedule::from_str(&normalized_expr).map_err(|error| { + BitFunError::validation(format!("Invalid cron expression '{}': {}", expr, error)) + })?; + + match tz { + Some(tz_name) => { + let timezone = parse_timezone(tz_name)?; + let after = timezone + .timestamp_millis_opt(after_ms) + .single() + .ok_or_else(|| { + BitFunError::validation(format!( + "Unable to interpret timestamp {} in timezone {}", + after_ms, tz_name + )) + })?; + + schedule + .after(&after) + .next() + .map(|next| next.with_timezone(&Utc).timestamp_millis()) + .ok_or_else(|| { + BitFunError::validation(format!( + "Cron expression '{}' produced no future run time", + expr + )) + }) + } + None => { + let after = Local + .timestamp_millis_opt(after_ms) + .single() + .ok_or_else(|| { + BitFunError::validation(format!( + "Unable to interpret local timestamp {}", + after_ms + )) + })?; + + schedule + .after(&after) + .next() + .map(|next| next.with_timezone(&Utc).timestamp_millis()) + .ok_or_else(|| { + BitFunError::validation(format!( + "Cron expression '{}' produced no future run time", + expr + )) + }) + } + } +} + +fn parse_timezone(tz_name: &str) -> BitFunResult { + Tz::from_str(tz_name).map_err(|error| { + BitFunError::validation(format!("Invalid timezone '{}': {}", tz_name, error)) + }) +} + +fn normalize_cron_expr(expr: &str) -> BitFunResult { + let fields = expr.split_whitespace().collect::>(); + match fields.len() { + 5 => Ok(format!("0 {}", expr)), + 6 | 7 => Ok(expr.to_string()), + other => Err(BitFunError::validation(format!( + "Cron expression '{}' must contain 5, 6, or 7 fields, found {}", + expr, other + ))), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::service::cron::{CronJobPayload, CronJobState}; + + fn sample_job(schedule: CronSchedule) -> CronJob { + CronJob { + id: "cron_test".to_string(), + name: "test".to_string(), + schedule, + payload: CronJobPayload { + text: "hello".to_string(), + }, + enabled: true, + session_id: "session_1".to_string(), + workspace_path: "E:/workspace".to_string(), + created_at_ms: 1_700_000_000_000, + config_updated_at_ms: 1_700_000_000_000, + updated_at_ms: 1_700_000_000_000, + state: CronJobState::default(), + } + } + + #[test] + fn every_schedule_keeps_anchor_alignment() { + let next = compute_next_run_after_ms( + &CronSchedule::Every { + every_ms: 60_000, + anchor_ms: Some(1_000), + }, + 1_000, + 181_000, + ) + .expect("next run"); + + assert_eq!(next, Some(241_000)); + } + + #[test] + fn initial_at_schedule_runs_even_if_time_has_passed() { + let mut job = sample_job(CronSchedule::At { + at: "2026-03-17T08:00:00+08:00".to_string(), + }); + job.created_at_ms = 1_763_667_200_000; + + let next = compute_initial_next_run_at_ms(&job, 1_763_700_000_000).expect("initial next"); + assert_eq!(next, Some(1_773_705_600_000)); + } + + #[test] + fn cron_schedule_respects_timezone() { + let after_ms = Utc + .with_ymd_and_hms(2026, 3, 17, 0, 30, 0) + .single() + .expect("valid datetime") + .timestamp_millis(); + + let next = compute_next_run_after_ms( + &CronSchedule::Cron { + expr: "0 8 * * *".to_string(), + tz: Some("Asia/Shanghai".to_string()), + }, + after_ms, + after_ms, + ) + .expect("cron next"); + + let expected = Utc + .with_ymd_and_hms(2026, 3, 18, 0, 0, 0) + .single() + .expect("valid datetime") + .timestamp_millis(); + + assert_eq!(next, Some(expected)); + } +} diff --git a/src/crates/core/src/service/cron/service.rs b/src/crates/core/src/service/cron/service.rs new file mode 100644 index 00000000..f22aba6e --- /dev/null +++ b/src/crates/core/src/service/cron/service.rs @@ -0,0 +1,664 @@ +//! Scheduled job service. + +use super::schedule::{compute_initial_next_run_at_ms, compute_next_run_after_ms, validate_schedule}; +use super::store::CronJobStore; +use super::types::{ + CreateCronJobRequest, CronJob, CronJobPayload, CronJobRunStatus, CronSchedule, + DEFAULT_RETRY_DELAY_MS, UpdateCronJobRequest, +}; +use crate::agentic::coordination::{ + DialogQueuePriority, DialogScheduler, DialogSubmissionPolicy, DialogTriggerSource, +}; +use crate::infrastructure::PathManager; +use crate::util::errors::{BitFunError, BitFunResult}; +use chrono::Utc; +use log::{debug, info, warn}; +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, OnceLock}; +use tokio::sync::{Mutex, Notify, RwLock}; +use tokio::time::Duration; +use uuid::Uuid; + +static GLOBAL_CRON_SERVICE: OnceLock> = OnceLock::new(); + +pub struct CronService { + scheduler: Arc, + store: Arc, + jobs: Arc>>, + mutation_lock: Arc>, + wakeup: Arc, + runner_started: AtomicBool, +} + +impl CronService { + pub async fn new( + path_manager: Arc, + scheduler: Arc, + ) -> BitFunResult> { + let store = Arc::new(CronJobStore::new(path_manager).await?); + let loaded = store.load().await?; + let current_ms = now_ms(); + + let mut jobs = HashMap::new(); + let mut needs_save = false; + + for mut job in loaded.jobs { + if jobs.contains_key(&job.id) { + return Err(BitFunError::service(format!( + "Duplicate scheduled job id found in jobs.json: {}", + job.id + ))); + } + + needs_save |= reconcile_loaded_job(&mut job, current_ms)?; + jobs.insert(job.id.clone(), job); + } + + let service = Arc::new(Self { + scheduler, + store, + jobs: Arc::new(RwLock::new(jobs)), + mutation_lock: Arc::new(Mutex::new(())), + wakeup: Arc::new(Notify::new()), + runner_started: AtomicBool::new(false), + }); + + if needs_save { + service.persist_snapshot().await?; + } + + Ok(service) + } + + pub fn start(self: &Arc) { + if self + .runner_started + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + return; + } + + let service = Arc::clone(self); + tokio::spawn(async move { + service.run_loop().await; + }); + } + + pub async fn list_jobs(&self) -> Vec { + let jobs = self.jobs.read().await; + jobs.values().cloned().collect::>() + } + + pub async fn list_jobs_filtered( + &self, + workspace_path: Option<&str>, + session_id: Option<&str>, + ) -> Vec { + let jobs = self.jobs.read().await; + jobs + .values() + .filter(|job| { + workspace_path + .map(|workspace_path| job.workspace_path == workspace_path) + .unwrap_or(true) + && session_id + .map(|session_id| job.session_id == session_id) + .unwrap_or(true) + }) + .cloned() + .collect::>() + } + + pub async fn get_job(&self, job_id: &str) -> Option { + self.jobs.read().await.get(job_id).cloned() + } + + pub async fn create_job(&self, request: CreateCronJobRequest) -> BitFunResult { + let _guard = self.mutation_lock.lock().await; + let mut jobs = self.jobs.write().await; + let current_ms = now_ms(); + let schedule = materialize_schedule(request.schedule, current_ms); + validate_request_fields( + &request.name, + &request.payload, + &request.session_id, + &request.workspace_path, + )?; + validate_schedule(&schedule, current_ms)?; + + let mut job = CronJob { + id: format!("cron_{}", Uuid::new_v4().simple()), + name: request.name.trim().to_string(), + schedule, + payload: request.payload, + enabled: request.enabled, + session_id: request.session_id.trim().to_string(), + workspace_path: request.workspace_path.trim().to_string(), + created_at_ms: current_ms, + config_updated_at_ms: current_ms, + updated_at_ms: current_ms, + state: Default::default(), + }; + + if job.enabled { + job.state.next_run_at_ms = compute_initial_next_run_at_ms(&job, current_ms)?; + } + + jobs.insert(job.id.clone(), job.clone()); + self.persist_jobs_locked(&jobs).await?; + drop(jobs); + self.wakeup.notify_one(); + + Ok(job) + } + + pub async fn update_job( + &self, + job_id: &str, + request: UpdateCronJobRequest, + ) -> BitFunResult { + let _guard = self.mutation_lock.lock().await; + let mut jobs = self.jobs.write().await; + let current_ms = now_ms(); + let job = jobs + .get_mut(job_id) + .ok_or_else(|| BitFunError::NotFound(format!("Scheduled job not found: {}", job_id)))?; + + if let Some(name) = request.name { + job.name = name.trim().to_string(); + } + if let Some(payload) = request.payload { + job.payload = payload; + } + if let Some(session_id) = request.session_id { + job.session_id = session_id.trim().to_string(); + } + if let Some(workspace_path) = request.workspace_path { + job.workspace_path = workspace_path.trim().to_string(); + } + if let Some(enabled) = request.enabled { + job.enabled = enabled; + } + if let Some(schedule) = request.schedule { + job.schedule = materialize_schedule(schedule, current_ms); + } + + validate_request_fields( + &job.name, + &job.payload, + &job.session_id, + &job.workspace_path, + )?; + validate_schedule(&job.schedule, job.created_at_ms)?; + + job.config_updated_at_ms = current_ms; + job.updated_at_ms = current_ms; + job.state.pending_trigger_at_ms = None; + job.state.retry_at_ms = None; + + if !job.enabled { + job.state.next_run_at_ms = None; + } else if job.state.active_turn_id.is_some() { + if job.is_one_shot() { + job.state.next_run_at_ms = None; + } else { + job.state.next_run_at_ms = + compute_next_run_after_ms(&job.schedule, job.created_at_ms, current_ms)?; + } + } else { + job.state.next_run_at_ms = compute_initial_next_run_at_ms(job, current_ms)?; + } + + let updated = job.clone(); + self.persist_jobs_locked(&jobs).await?; + drop(jobs); + self.wakeup.notify_one(); + + Ok(updated) + } + + pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> BitFunResult { + self.update_job( + job_id, + UpdateCronJobRequest { + enabled: Some(enabled), + ..Default::default() + }, + ) + .await + } + + pub async fn delete_job(&self, job_id: &str) -> BitFunResult { + let _guard = self.mutation_lock.lock().await; + let mut jobs = self.jobs.write().await; + let existed = jobs.remove(job_id).is_some(); + if existed { + self.persist_jobs_locked(&jobs).await?; + drop(jobs); + self.wakeup.notify_one(); + } + Ok(existed) + } + + pub async fn handle_turn_started(&self, turn_id: &str) -> BitFunResult<()> { + self.handle_turn_state_change(turn_id, |job, now_ms| { + job.state.last_run_status = Some(CronJobRunStatus::Running); + job.state.last_run_started_at_ms = Some(now_ms); + job.updated_at_ms = now_ms; + }) + .await + } + + pub async fn handle_turn_completed(&self, turn_id: &str, duration_ms: u64) -> BitFunResult<()> { + self.handle_turn_state_change(turn_id, |job, now_ms| { + job.state.active_turn_id = None; + job.state.last_run_status = Some(CronJobRunStatus::Ok); + job.state.last_error = None; + job.state.last_duration_ms = Some(duration_ms); + job.state.last_run_finished_at_ms = Some(now_ms); + job.state.last_run_started_at_ms = Some(now_ms.saturating_sub(duration_ms as i64)); + job.state.consecutive_failures = 0; + job.updated_at_ms = now_ms; + }) + .await + } + + pub async fn handle_turn_failed(&self, turn_id: &str, error: &str) -> BitFunResult<()> { + self.handle_turn_state_change(turn_id, |job, now_ms| { + job.state.active_turn_id = None; + job.state.last_run_status = Some(CronJobRunStatus::Error); + job.state.last_error = Some(error.to_string()); + job.state.last_run_finished_at_ms = Some(now_ms); + job.state.consecutive_failures = job.state.consecutive_failures.saturating_add(1); + job.updated_at_ms = now_ms; + }) + .await + } + + pub async fn handle_turn_cancelled(&self, turn_id: &str) -> BitFunResult<()> { + self.handle_turn_state_change(turn_id, |job, now_ms| { + job.state.active_turn_id = None; + job.state.last_run_status = Some(CronJobRunStatus::Cancelled); + job.state.last_error = None; + job.state.last_run_finished_at_ms = Some(now_ms); + job.updated_at_ms = now_ms; + }) + .await + } + + async fn handle_turn_state_change(&self, turn_id: &str, update: F) -> BitFunResult<()> + where + F: FnOnce(&mut CronJob, i64), + { + let _guard = self.mutation_lock.lock().await; + let mut jobs = self.jobs.write().await; + let Some(job) = jobs + .values_mut() + .find(|job| job.state.active_turn_id.as_deref() == Some(turn_id)) + else { + return Ok(()); + }; + + update(job, now_ms()); + self.persist_jobs_locked(&jobs).await?; + drop(jobs); + self.wakeup.notify_one(); + Ok(()) + } + + async fn run_loop(self: Arc) { + info!("Cron service loop started"); + + loop { + match self.next_wakeup_at().await { + Some(next_wakeup_ms) => { + let current_ms = now_ms(); + if next_wakeup_ms > current_ms { + let sleep_ms = (next_wakeup_ms - current_ms) as u64; + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(sleep_ms)) => {} + _ = self.wakeup.notified() => { + continue; + } + } + } + } + None => { + self.wakeup.notified().await; + continue; + } + } + + if let Err(error) = self.process_due_jobs().await { + warn!("Failed to process due scheduled jobs: {}", error); + tokio::time::sleep(Duration::from_millis(1_000)).await; + } + } + } + + async fn next_wakeup_at(&self) -> Option { + let jobs = self.jobs.read().await; + jobs.values().filter_map(next_wakeup_for_job).min() + } + + async fn process_due_jobs(&self) -> BitFunResult<()> { + let current_ms = now_ms(); + let due_job_ids = { + let jobs = self.jobs.read().await; + let mut due = jobs + .values() + .filter_map(|job| { + let wake_at = next_wakeup_for_job(job)?; + (wake_at <= current_ms).then(|| (job.id.clone(), wake_at)) + }) + .collect::>(); + due.sort_by(|left, right| left.1.cmp(&right.1).then_with(|| left.0.cmp(&right.0))); + due.into_iter().map(|(job_id, _)| job_id).collect::>() + }; + + for job_id in due_job_ids { + self.process_job(&job_id).await?; + } + + Ok(()) + } + + async fn process_job(&self, job_id: &str) -> BitFunResult<()> { + let _guard = self.mutation_lock.lock().await; + let mut jobs = self.jobs.write().await; + let current_ms = now_ms(); + + let mut should_persist = false; + let mut should_attempt_enqueue = false; + let mut scheduled_at_ms = None; + let mut enqueue_input = None; + + { + let Some(job) = jobs.get_mut(job_id) else { + return Ok(()); + }; + + if !job.enabled && job.state.pending_trigger_at_ms.is_none() { + return Ok(()); + } + + if let Some(next_run_at_ms) = job.state.next_run_at_ms { + if next_run_at_ms <= current_ms { + if job.state.active_turn_id.is_some() || job.state.pending_trigger_at_ms.is_some() { + job.state.last_trigger_at_ms = Some(next_run_at_ms); + job.state.coalesced_run_count = + job.state.coalesced_run_count.saturating_add(1); + job.state.next_run_at_ms = + compute_next_run_after_ms(&job.schedule, job.created_at_ms, current_ms)?; + job.updated_at_ms = current_ms; + should_persist = true; + } else { + job.state.pending_trigger_at_ms = Some(next_run_at_ms); + job.state.last_trigger_at_ms = Some(next_run_at_ms); + job.state.retry_at_ms = None; + job.state.next_run_at_ms = + compute_next_run_after_ms(&job.schedule, job.created_at_ms, current_ms)?; + job.updated_at_ms = current_ms; + should_persist = true; + } + } + } + + if job.state.active_turn_id.is_none() && pending_is_due(job, current_ms) { + let pending_trigger_at_ms = job.state.pending_trigger_at_ms.ok_or_else(|| { + BitFunError::service(format!( + "Scheduled job {} is missing pending trigger timestamp", + job.id + )) + })?; + + let turn_id = format!("cronjob_{}_{}", job.id, pending_trigger_at_ms); + scheduled_at_ms = Some(pending_trigger_at_ms); + enqueue_input = Some(EnqueueInput { + turn_id, + session_id: job.session_id.clone(), + workspace_path: job.workspace_path.clone(), + user_input: job.payload.text.clone(), + }); + should_attempt_enqueue = true; + } + } + + if should_persist { + self.persist_jobs_locked(&jobs).await?; + } + + if !should_attempt_enqueue { + return Ok(()); + } + + let enqueue_input = enqueue_input.ok_or_else(|| { + BitFunError::service(format!( + "Scheduled job {} is missing enqueue input after due calculation", + job_id + )) + })?; + let scheduled_at_ms = scheduled_at_ms.ok_or_else(|| { + BitFunError::service(format!( + "Scheduled job {} is missing scheduled timestamp after due calculation", + job_id + )) + })?; + + let submit_result = self + .scheduler + .submit( + enqueue_input.session_id.clone(), + enqueue_input.user_input, + None, + Some(enqueue_input.turn_id.clone()), + String::new(), + Some(enqueue_input.workspace_path), + scheduled_job_policy(), + None, + ) + .await; + + let now_after_submit = now_ms(); + let Some(job) = jobs.get_mut(job_id) else { + return Ok(()); + }; + + match submit_result { + Ok(()) => { + job.state.active_turn_id = Some(enqueue_input.turn_id); + job.state.pending_trigger_at_ms = None; + job.state.retry_at_ms = None; + job.state.last_enqueued_at_ms = Some(now_after_submit); + job.state.last_run_status = Some(CronJobRunStatus::Queued); + job.state.last_error = None; + job.updated_at_ms = now_after_submit; + + if job.is_one_shot() { + job.enabled = false; + job.state.next_run_at_ms = None; + } + + debug!( + "Scheduled job enqueued: job_id={}, session_id={}, scheduled_at_ms={}", + job.id, job.session_id, scheduled_at_ms + ); + } + Err(error) => { + job.state.last_run_status = Some(CronJobRunStatus::Error); + job.state.last_error = Some(error.clone()); + job.state.last_run_finished_at_ms = Some(now_after_submit); + job.state.retry_at_ms = Some(now_after_submit + DEFAULT_RETRY_DELAY_MS); + job.state.consecutive_failures = job.state.consecutive_failures.saturating_add(1); + job.updated_at_ms = now_after_submit; + + warn!( + "Failed to enqueue scheduled job: job_id={}, session_id={}, error={}", + job.id, job.session_id, error + ); + } + } + + self.persist_jobs_locked(&jobs).await?; + drop(jobs); + self.wakeup.notify_one(); + Ok(()) + } + + async fn persist_snapshot(&self) -> BitFunResult<()> { + let jobs = self.jobs.read().await; + self.persist_jobs_locked(&jobs).await + } + + async fn persist_jobs_locked( + &self, + jobs: &HashMap, + ) -> BitFunResult<()> { + self.store + .save_jobs(jobs.values().cloned().collect::>()) + .await + } +} + +pub fn get_global_cron_service() -> Option> { + GLOBAL_CRON_SERVICE.get().cloned() +} + +pub fn set_global_cron_service(service: Arc) { + let _ = GLOBAL_CRON_SERVICE.set(service); +} + +fn reconcile_loaded_job(job: &mut CronJob, now_ms: i64) -> BitFunResult { + let original = job.clone(); + + validate_request_fields( + &job.name, + &job.payload, + &job.session_id, + &job.workspace_path, + )?; + validate_schedule(&job.schedule, job.created_at_ms)?; + + if job.updated_at_ms < job.created_at_ms { + job.updated_at_ms = job.created_at_ms; + } + + if let CronSchedule::Every { anchor_ms, .. } = &mut job.schedule { + if anchor_ms.is_none() { + *anchor_ms = Some(job.created_at_ms); + } + } + + if job.state.active_turn_id.is_some() { + job.state.active_turn_id = None; + job.state.pending_trigger_at_ms = None; + job.state.retry_at_ms = None; + job.state.last_run_status = Some(CronJobRunStatus::Error); + job.state.last_error = + Some("Application restarted before the scheduled job finished".to_string()); + job.state.last_run_finished_at_ms = Some(now_ms); + job.state.consecutive_failures = job.state.consecutive_failures.saturating_add(1); + job.updated_at_ms = now_ms; + } + + if !job.enabled { + job.state.next_run_at_ms = None; + job.state.pending_trigger_at_ms = None; + job.state.retry_at_ms = None; + } else if job.state.pending_trigger_at_ms.is_some() { + if job.state.retry_at_ms.is_none() { + job.state.retry_at_ms = Some(now_ms); + } + } else if job.state.next_run_at_ms.is_none() { + job.state.next_run_at_ms = compute_initial_next_run_at_ms(job, now_ms)?; + } + + Ok(job != &original) +} + +fn validate_request_fields( + name: &str, + payload: &CronJobPayload, + session_id: &str, + workspace_path: &str, +) -> BitFunResult<()> { + if name.trim().is_empty() { + return Err(BitFunError::validation( + "Scheduled job name must not be empty", + )); + } + if payload.text.trim().is_empty() { + return Err(BitFunError::validation( + "Scheduled job payload.text must not be empty", + )); + } + if session_id.trim().is_empty() { + return Err(BitFunError::validation( + "Scheduled job sessionId must not be empty", + )); + } + if workspace_path.trim().is_empty() { + return Err(BitFunError::validation( + "Scheduled job workspacePath must not be empty", + )); + } + + Ok(()) +} + +fn materialize_schedule(schedule: CronSchedule, anchor_ms: i64) -> CronSchedule { + match schedule { + CronSchedule::Every { + every_ms, + anchor_ms: None, + } => CronSchedule::Every { + every_ms, + anchor_ms: Some(anchor_ms), + }, + other => other, + } +} + +fn pending_is_due(job: &CronJob, now_ms: i64) -> bool { + let Some(pending_trigger_at_ms) = job.state.pending_trigger_at_ms else { + return false; + }; + + let retry_at_ms = job.state.retry_at_ms.unwrap_or(pending_trigger_at_ms); + retry_at_ms <= now_ms +} + +fn next_wakeup_for_job(job: &CronJob) -> Option { + let schedule_wakeup = job.state.next_run_at_ms; + let retry_wakeup = job.state.pending_trigger_at_ms.map(|pending_trigger_at_ms| { + job.state.retry_at_ms.unwrap_or(pending_trigger_at_ms) + }); + + match (schedule_wakeup, retry_wakeup) { + (Some(left), Some(right)) => Some(left.min(right)), + (Some(left), None) => Some(left), + (None, Some(right)) => Some(right), + (None, None) => None, + } +} + +fn scheduled_job_policy() -> DialogSubmissionPolicy { + DialogSubmissionPolicy::new( + DialogTriggerSource::ScheduledJob, + DialogQueuePriority::Low, + true, + ) +} + +fn now_ms() -> i64 { + Utc::now().timestamp_millis() +} + +struct EnqueueInput { + turn_id: String, + session_id: String, + workspace_path: String, + user_input: String, +} diff --git a/src/crates/core/src/service/cron/store.rs b/src/crates/core/src/service/cron/store.rs new file mode 100644 index 00000000..2894901f --- /dev/null +++ b/src/crates/core/src/service/cron/store.rs @@ -0,0 +1,61 @@ +//! jobs.json persistence wrapper. + +use super::types::{CronJob, CronJobsFile, CRON_JOBS_VERSION}; +use crate::infrastructure::storage::{PersistenceService, StorageOptions}; +use crate::infrastructure::PathManager; +use crate::util::errors::{BitFunError, BitFunResult}; +use std::sync::Arc; + +pub struct CronJobStore { + persistence: PersistenceService, + path_manager: Arc, +} + +impl CronJobStore { + pub async fn new(path_manager: Arc) -> BitFunResult { + let cron_dir = path_manager.user_cron_dir(); + path_manager.ensure_dir(&cron_dir).await?; + + let persistence = PersistenceService::new(cron_dir).await?; + + Ok(Self { + persistence, + path_manager, + }) + } + + pub fn jobs_file_path(&self) -> std::path::PathBuf { + self.path_manager.cron_jobs_file() + } + + pub async fn load(&self) -> BitFunResult { + let data = self.persistence.load_json::("jobs").await?; + match data { + Some(file) if file.version == CRON_JOBS_VERSION => Ok(file), + Some(file) => Err(BitFunError::service(format!( + "Unsupported cron jobs file version {} in {:?}", + file.version, + self.jobs_file_path() + ))), + None => Ok(CronJobsFile::default()), + } + } + + pub async fn save_jobs(&self, jobs: Vec) -> BitFunResult<()> { + let mut jobs = jobs; + jobs.sort_by(|left, right| { + left.created_at_ms + .cmp(&right.created_at_ms) + .then_with(|| left.id.cmp(&right.id)) + }); + + let data = CronJobsFile { + version: CRON_JOBS_VERSION, + jobs, + }; + + self.persistence + .save_json("jobs", &data, StorageOptions::default()) + .await + } +} diff --git a/src/crates/core/src/service/cron/subscriber.rs b/src/crates/core/src/service/cron/subscriber.rs new file mode 100644 index 00000000..e8544e37 --- /dev/null +++ b/src/crates/core/src/service/cron/subscriber.rs @@ -0,0 +1,50 @@ +//! Scheduled job event subscriber. + +use super::service::CronService; +use crate::agentic::events::{AgenticEvent, EventSubscriber}; +use crate::util::errors::BitFunResult; +use log::error; +use std::sync::Arc; + +pub struct CronEventSubscriber { + cron_service: Arc, +} + +impl CronEventSubscriber { + pub fn new(cron_service: Arc) -> Self { + Self { cron_service } + } +} + +#[async_trait::async_trait] +impl EventSubscriber for CronEventSubscriber { + async fn on_event(&self, event: &AgenticEvent) -> BitFunResult<()> { + let result = match event { + AgenticEvent::DialogTurnStarted { turn_id, .. } => { + self.cron_service.handle_turn_started(turn_id).await + } + AgenticEvent::DialogTurnCompleted { + turn_id, + duration_ms, + .. + } => { + self.cron_service + .handle_turn_completed(turn_id, *duration_ms) + .await + } + AgenticEvent::DialogTurnFailed { + turn_id, error, .. + } => self.cron_service.handle_turn_failed(turn_id, error).await, + AgenticEvent::DialogTurnCancelled { turn_id, .. } => { + self.cron_service.handle_turn_cancelled(turn_id).await + } + _ => Ok(()), + }; + + if let Err(error) = &result { + error!("Failed to update scheduled job state from event: {}", error); + } + + result + } +} diff --git a/src/crates/core/src/service/cron/types.rs b/src/crates/core/src/service/cron/types.rs new file mode 100644 index 00000000..29005b53 --- /dev/null +++ b/src/crates/core/src/service/cron/types.rs @@ -0,0 +1,139 @@ +//! Scheduled job data types. + +use serde::{Deserialize, Serialize}; + +pub const CRON_JOBS_VERSION: u32 = 1; + +pub const DEFAULT_RETRY_DELAY_MS: i64 = 5_000; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CronJobsFile { + pub version: u32, + pub jobs: Vec, +} + +impl Default for CronJobsFile { + fn default() -> Self { + Self { + version: CRON_JOBS_VERSION, + jobs: Vec::new(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CronJob { + pub id: String, + pub name: String, + pub schedule: CronSchedule, + pub payload: CronJobPayload, + pub enabled: bool, + pub session_id: String, + pub workspace_path: String, + pub created_at_ms: i64, + pub config_updated_at_ms: i64, + pub updated_at_ms: i64, + #[serde(default)] + pub state: CronJobState, +} + +impl CronJob { + pub fn is_one_shot(&self) -> bool { + matches!(self.schedule, CronSchedule::At { .. }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "lowercase")] +pub enum CronSchedule { + At { + at: String, + }, + Every { + #[serde(rename = "everyMs")] + every_ms: u64, + #[serde(rename = "anchorMs", skip_serializing_if = "Option::is_none")] + anchor_ms: Option, + }, + Cron { + expr: String, + #[serde(skip_serializing_if = "Option::is_none")] + tz: Option, + }, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CronJobPayload { + pub text: String, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CronJobRunStatus { + Queued, + Running, + Ok, + Error, + Cancelled, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CronJobState { + #[serde(skip_serializing_if = "Option::is_none")] + pub next_run_at_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub pending_trigger_at_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_at_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_trigger_at_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_enqueued_at_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_run_started_at_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_run_finished_at_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_duration_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_run_status: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub active_turn_id: Option, + #[serde(default)] + pub consecutive_failures: u32, + #[serde(default)] + pub coalesced_run_count: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateCronJobRequest { + pub name: String, + pub schedule: CronSchedule, + pub payload: CronJobPayload, + #[serde(default = "default_enabled")] + pub enabled: bool, + pub session_id: String, + pub workspace_path: String, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateCronJobRequest { + pub name: Option, + pub schedule: Option, + pub payload: Option, + pub enabled: Option, + pub session_id: Option, + pub workspace_path: Option, +} + +const fn default_enabled() -> bool { + true +} diff --git a/src/crates/core/src/service/mod.rs b/src/crates/core/src/service/mod.rs index 2b9c91ed..a99cb452 100644 --- a/src/crates/core/src/service/mod.rs +++ b/src/crates/core/src/service/mod.rs @@ -7,6 +7,7 @@ pub mod ai_memory; // AI memory point management pub mod ai_rules; // AI rules management pub(crate) mod bootstrap; // Workspace persona bootstrap helpers pub mod config; // Config management +pub mod cron; // Scheduled jobs pub mod diff; pub mod filesystem; // FileSystem management pub mod git; // Git service @@ -30,6 +31,9 @@ pub use ai_memory::{AIMemory, AIMemoryManager, MemoryType}; pub use ai_rules::AIRulesService; pub use bootstrap::reset_workspace_persona_files_to_default; pub use config::{ConfigManager, ConfigProvider, ConfigService}; +pub use cron::{ + get_global_cron_service, set_global_cron_service, CronEventSubscriber, CronService, +}; pub use diff::{ DiffConfig, DiffHunk, DiffLine, DiffLineType, DiffOptions, DiffResult, DiffService, }; diff --git a/src/web-ui/src/app/components/NavPanel/MainNav.tsx b/src/web-ui/src/app/components/NavPanel/MainNav.tsx index 22928cdd..666612ea 100644 --- a/src/web-ui/src/app/components/NavPanel/MainNav.tsx +++ b/src/web-ui/src/app/components/NavPanel/MainNav.tsx @@ -13,7 +13,7 @@ import React, { useCallback, useState, useMemo, useEffect, useRef } from 'react'; import { createPortal } from 'react-dom'; -import { Plus, FolderOpen, FolderPlus, History, Check, Bot } from 'lucide-react'; +import { Plus, FolderOpen, FolderPlus, History, Check, Bot, Clock3 } from 'lucide-react'; import { Badge, Tooltip } from '@/component-library'; import { useApp } from '../../hooks/useApp'; import { useSceneManager } from '../../hooks/useSceneManager'; @@ -28,6 +28,7 @@ import NavItem from './components/NavItem'; import SectionHeader from './components/SectionHeader'; import MiniAppEntry from './components/MiniAppEntry'; import WorkspaceListSection from './sections/workspaces/WorkspaceListSection'; +import ScheduledJobsDialog from '../ScheduledJobsDialog/ScheduledJobsDialog'; import { useSceneStore } from '../../stores/sceneStore'; import { useMyAgentStore } from '../../scenes/my-agent/myAgentStore'; import { useMiniAppCatalogSync } from '../../scenes/miniapps/hooks/useMiniAppCatalogSync'; @@ -92,6 +93,7 @@ const MainNav: React.FC = ({ const openNavScene = useNavSceneStore(s => s.openNavScene); const activeTabId = useSceneStore(s => s.activeTabId); const setMyAgentView = useMyAgentStore(s => s.setActiveView); + const selectedAssistantWorkspaceId = useMyAgentStore((s) => s.selectedAssistantWorkspaceId); const setSelectedAssistantWorkspaceId = useMyAgentStore((s) => s.setSelectedAssistantWorkspaceId); const { t } = useI18n('common'); const { @@ -211,12 +213,59 @@ const MainNav: React.FC = ({ () => assistantWorkspacesList.find(workspace => !workspace.assistantId) ?? assistantWorkspacesList[0] ?? null, [assistantWorkspacesList] ); + const selectedAssistantWorkspace = useMemo(() => { + if (!selectedAssistantWorkspaceId) { + return null; + } + + return assistantWorkspacesList.find( + (workspace) => workspace.id === selectedAssistantWorkspaceId + ) ?? null; + }, [assistantWorkspacesList, selectedAssistantWorkspaceId]); + const resolvedAssistantWorkspace = useMemo(() => { + if (isAssistantWorkspaceActive && currentWorkspace?.workspaceKind === WorkspaceKind.Assistant) { + return currentWorkspace; + } + + if (selectedAssistantWorkspace) { + return selectedAssistantWorkspace; + } + + return defaultAssistantWorkspace; + }, [ + currentWorkspace, + defaultAssistantWorkspace, + isAssistantWorkspaceActive, + selectedAssistantWorkspace, + ]); + const resolvedAssistantDisplayName = useMemo(() => { + if (!resolvedAssistantWorkspace) { + return ''; + } + + return resolvedAssistantWorkspace.identity?.name?.trim() || resolvedAssistantWorkspace.name; + }, [resolvedAssistantWorkspace]); + const resolvedAssistantSessionId = useMemo(() => { + const workspacePath = resolvedAssistantWorkspace?.rootPath; + if (!workspacePath) { + return undefined; + } + + const workspaceSessions = Array.from(flowChatStore.getState().sessions.values()) + .filter(session => + (session.workspacePath || workspacePath) === workspacePath && !session.parentSessionId + ) + .sort(compareSessionsForDisplay); + + return workspaceSessions[0]?.sessionId; + }, [resolvedAssistantWorkspace]); const [defaultSessionMode, setDefaultSessionMode] = useState<'code' | 'cowork'>('code'); const [navDisplayMode, setNavDisplayMode] = useState(getInitialNavDisplayMode); const [isModeSwitching, setIsModeSwitching] = useState(false); const [modeLogoSrc, setModeLogoSrc] = useState('/panda_1.png'); const [modeLogoHoverSrc, setModeLogoHoverSrc] = useState('/panda_2.png'); + const [isScheduledJobsDialogOpen, setIsScheduledJobsDialogOpen] = useState(false); const modeSwitchTimerRef = useRef(null); const modeSwitchSwapTimerRef = useRef(null); @@ -413,6 +462,13 @@ const MainNav: React.FC = ({ switchLeftPanelTab, ]); + const handleOpenScheduledJobsDialog = useCallback(() => { + if (resolvedAssistantWorkspace?.id) { + setSelectedAssistantWorkspaceId(resolvedAssistantWorkspace.id); + } + setIsScheduledJobsDialogOpen(true); + }, [resolvedAssistantWorkspace, setSelectedAssistantWorkspaceId]); + const handleOpenProModeSession = useCallback(async () => { // 找到项目工作区(非 assistant 类型) const projectWorkspaces = openedWorkspacesList.filter( @@ -593,6 +649,7 @@ const MainNav: React.FC = ({ const personaTooltip = t('nav.items.persona'); const createSessionTooltip = t('nav.sessions.newClawSession'); const createAssistantTooltip = t('nav.workspaces.actions.newAssistant'); + const scheduledJobsTooltip = t('nav.scheduledJobs.open'); const openProjectTooltip = t('header.openProject'); const createCodeTooltip = t('nav.sessions.newCodeSession'); const createCoworkTooltip = t('nav.sessions.newCoworkSession'); @@ -664,17 +721,31 @@ const MainNav: React.FC = ({
{isAssistantNavMode ? ( - - - + <> + + + + + + + ) : ( <> @@ -830,6 +901,15 @@ const MainNav: React.FC = ({
{workspaceMenuPortal} + setIsScheduledJobsDialogOpen(false)} + targetWorkspacePath={resolvedAssistantWorkspace?.rootPath} + targetSessionId={resolvedAssistantSessionId} + assistantName={resolvedAssistantDisplayName} + hideTargetFields + listScope="workspace" + /> ); }; diff --git a/src/web-ui/src/app/components/ScheduledJobsDialog/ScheduledJobsDialog.scss b/src/web-ui/src/app/components/ScheduledJobsDialog/ScheduledJobsDialog.scss new file mode 100644 index 00000000..19d2b872 --- /dev/null +++ b/src/web-ui/src/app/components/ScheduledJobsDialog/ScheduledJobsDialog.scss @@ -0,0 +1,330 @@ +@use '../../../component-library/styles/tokens.scss' as *; + +.bitfun-scheduled-jobs-dialog { + display: grid; + grid-template-columns: 320px minmax(0, 1fr); + min-height: 620px; + max-height: 78vh; + background: var(--color-bg-primary); + + &__sidebar { + border-right: 1px solid var(--color-border-default); + background: color-mix(in srgb, var(--element-bg-soft) 78%, transparent); + display: flex; + flex-direction: column; + min-height: 0; + } + + &__sidebar-header, + &__editor-header { + display: flex; + justify-content: space-between; + gap: 12px; + padding: 16px 18px; + border-bottom: 1px solid var(--color-border-default); + } + + &__sidebar-header { + align-items: flex-start; + } + + &__editor-header { + align-items: center; + } + + &__sidebar-title, + &__editor-title { + display: inline-flex; + align-items: center; + gap: 8px; + font-size: 14px; + font-weight: 700; + color: var(--color-text-primary); + } + + &__sidebar-subtitle, + &__editor-subtitle, + &__title-target { + font-size: 12px; + color: var(--color-text-secondary); + } + + &__title-target { + display: inline-flex; + align-items: center; + padding: 4px 8px; + border-radius: 999px; + background: color-mix(in srgb, var(--element-bg-soft) 86%, transparent); + } + + &__sidebar-actions, + &__editor-actions { + display: inline-flex; + align-items: center; + gap: 8px; + } + + &__job-list { + flex: 1; + min-height: 0; + overflow: auto; + padding: 12px; + display: flex; + flex-direction: column; + gap: 10px; + } + + &__job-card { + position: relative; + display: grid; + grid-template-columns: minmax(0, 1fr) auto; + gap: 12px; + padding: 12px; + border: 1px solid transparent; + border-radius: 12px; + background: transparent; + cursor: pointer; + transition: border-color $motion-fast $easing-standard, + box-shadow $motion-fast $easing-standard, + background $motion-fast $easing-standard, + transform $motion-fast $easing-standard; + + &:hover { + border-color: color-mix( + in srgb, + var(--color-primary, var(--color-accent-500)) 24%, + var(--color-border-default) + ); + background: color-mix(in srgb, var(--element-bg-soft) 86%, transparent); + transform: translateY(-1px); + } + + &.is-selected { + border-color: color-mix( + in srgb, + var(--color-primary, var(--color-accent-500)) 52%, + var(--color-border-default) + ); + background: color-mix( + in srgb, + var(--color-primary, var(--color-accent-500)) 18%, + var(--element-bg-base) + ); + box-shadow: + inset 3px 0 0 var(--color-primary, var(--color-accent-500)), + 0 0 0 1px color-mix( + in srgb, + var(--color-primary, var(--color-accent-500)) 18%, + transparent + ); + } + + &.is-selected .bitfun-scheduled-jobs-dialog__job-name { + color: color-mix( + in srgb, + var(--color-primary, var(--color-accent-500)) 65%, + var(--color-text-primary) + ); + } + } + + &__job-main { + min-width: 0; + display: flex; + flex-direction: column; + gap: 6px; + } + + &__job-name-row { + display: flex; + align-items: center; + gap: 8px; + justify-content: space-between; + } + + &__job-name { + font-size: 13px; + font-weight: 700; + color: var(--color-text-primary); + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + } + + &__job-meta { + font-size: 12px; + color: var(--color-text-secondary); + line-height: 1.4; + } + + &__job-error { + font-size: 12px; + color: var(--color-danger-500); + line-height: 1.4; + word-break: break-word; + } + + &__job-actions { + display: flex; + align-items: flex-start; + gap: 6px; + } + + &__status-badge { + display: inline-flex; + align-items: center; + padding: 3px 8px; + border-radius: 999px; + font-size: 11px; + font-weight: 600; + color: var(--color-text-secondary); + background: color-mix(in srgb, var(--element-bg-medium) 85%, transparent); + white-space: nowrap; + + &.is-ok, + &.is-running, + &.is-queued { + color: var(--color-success-500); + background: color-mix(in srgb, var(--color-success-500) 12%, transparent); + } + + &.is-error, + &.is-cancelled { + color: var(--color-danger-500); + background: color-mix(in srgb, var(--color-danger-500) 10%, transparent); + } + } + + &__icon-btn { + width: 28px; + height: 28px; + display: inline-flex; + align-items: center; + justify-content: center; + border: 1px solid var(--color-border-default); + border-radius: 8px; + background: var(--element-bg-soft); + color: var(--color-text-secondary); + cursor: pointer; + transition: background $motion-fast $easing-standard, + border-color $motion-fast $easing-standard, + color $motion-fast $easing-standard; + + &:hover:not(:disabled) { + background: var(--element-bg-medium); + color: var(--color-text-primary); + } + + &:disabled { + opacity: 0.5; + cursor: not-allowed; + } + + &.is-danger:hover:not(:disabled) { + color: var(--color-danger-500); + border-color: color-mix(in srgb, var(--color-danger-500) 45%, transparent); + } + } + + &__empty { + flex: 1; + min-height: 0; + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; + gap: 8px; + padding: 24px; + text-align: center; + color: var(--color-text-secondary); + font-size: 13px; + } + + &__empty-title { + font-size: 14px; + font-weight: 700; + color: var(--color-text-primary); + } + + &__empty-text, + &__warning { + font-size: 12px; + color: var(--color-text-secondary); + line-height: 1.5; + } + + &__editor { + min-width: 0; + display: flex; + flex-direction: column; + min-height: 0; + } + + &__target-grid, + &__form-grid { + display: grid; + grid-template-columns: repeat(2, minmax(0, 1fr)); + gap: 14px; + padding: 16px 18px 0; + } + + &__assistant-banner { + margin: 16px 18px 0; + padding: 14px 16px; + border: 1px solid color-mix(in srgb, var(--color-primary) 24%, transparent); + border-radius: 12px; + background: color-mix(in srgb, var(--color-primary) 6%, transparent); + } + + &__assistant-label { + font-size: 12px; + color: var(--color-text-secondary); + margin-bottom: 6px; + } + + &__assistant-name { + font-size: 14px; + font-weight: 700; + color: var(--color-text-primary); + } + + &__field { + display: flex; + flex-direction: column; + gap: 8px; + } + + &__field-label { + font-size: 12px; + font-weight: 600; + color: var(--color-text-primary); + } + + &__prompt { + padding: 16px 18px 18px; + } + + &__warning { + padding: 0 18px; + color: var(--color-danger-500); + } +} + +@media (max-width: 1120px) { + .bitfun-scheduled-jobs-dialog { + grid-template-columns: 1fr; + max-height: none; + + &__sidebar { + border-right: none; + border-bottom: 1px solid var(--color-border-default); + min-height: 240px; + max-height: 320px; + } + + &__target-grid, + &__form-grid { + grid-template-columns: 1fr; + } + } +} diff --git a/src/web-ui/src/app/components/ScheduledJobsDialog/ScheduledJobsDialog.tsx b/src/web-ui/src/app/components/ScheduledJobsDialog/ScheduledJobsDialog.tsx new file mode 100644 index 00000000..6dbddaa3 --- /dev/null +++ b/src/web-ui/src/app/components/ScheduledJobsDialog/ScheduledJobsDialog.tsx @@ -0,0 +1,890 @@ +import React, { useCallback, useEffect, useMemo, useState } from 'react'; +import { Clock3, Plus, RefreshCw, Trash2 } from 'lucide-react'; +import { + Button, + Checkbox, + Input, + Modal, + Select, + Textarea, + confirmDanger, +} from '@/component-library'; +import { + cronAPI, + type CreateCronJobRequest, + type CronJob, + type CronSchedule, + type UpdateCronJobRequest, +} from '@/infrastructure/api'; +import { useI18n } from '@/infrastructure/i18n'; +import { flowChatStore } from '@/flow_chat/store/FlowChatStore'; +import type { FlowChatState, Session } from '@/flow_chat/types/flow-chat'; +import { compareSessionsForDisplay } from '@/flow_chat/utils/sessionOrdering'; +import { notificationService } from '@/shared/notification-system/services/NotificationService'; +import { createLogger } from '@/shared/utils/logger'; +import './ScheduledJobsDialog.scss'; + +const log = createLogger('ScheduledJobsDialog'); +const MINUTE_IN_MS = 60_000; + +type JobListScope = 'workspace' | 'session'; +type ScheduleKind = CronSchedule['kind']; + +interface JobDraft { + name: string; + text: string; + enabled: boolean; + workspacePath: string; + sessionId: string; + scheduleKind: ScheduleKind; + at: string; + everyMinutes: string; + anchorMs: string; + expr: string; + tz: string; +} + +export interface ScheduledJobsDialogProps { + isOpen: boolean; + onClose: () => void; + targetWorkspacePath?: string; + targetSessionId?: string; + assistantName?: string; + hideTargetFields?: boolean; + listScope?: JobListScope; +} + +const createEmptyDraft = (workspacePath = '', sessionId = ''): JobDraft => ({ + name: '', + text: '', + enabled: true, + workspacePath, + sessionId, + scheduleKind: 'cron', + at: getCurrentLocalDateTimeInput(), + everyMinutes: '60', + anchorMs: '', + expr: '0 8 * * *', + tz: '', +}); + +const ScheduledJobsDialog: React.FC = ({ + isOpen, + onClose, + targetWorkspacePath, + targetSessionId, + assistantName, + hideTargetFields = false, + listScope = 'session', +}) => { + const { t } = useI18n('common'); + const [flowChatState, setFlowChatState] = useState(() => flowChatStore.getState()); + const [jobs, setJobs] = useState([]); + const [loading, setLoading] = useState(false); + const [saving, setSaving] = useState(false); + const [selectedJobId, setSelectedJobId] = useState(null); + const [draft, setDraft] = useState(() => + createEmptyDraft(targetWorkspacePath ?? '', targetSessionId ?? '') + ); + + useEffect(() => { + const unsubscribe = flowChatStore.subscribe((state) => { + setFlowChatState(state); + }); + return unsubscribe; + }, []); + + const workspaceSessions = useMemo(() => { + const workspacePath = draft.workspacePath.trim(); + if (!workspacePath) { + return [] as Session[]; + } + + return Array.from(flowChatState.sessions.values()) + .filter((session) => (session.workspacePath || workspacePath) === workspacePath && !session.parentSessionId) + .sort(compareSessionsForDisplay); + }, [draft.workspacePath, flowChatState.sessions]); + + const defaultSessionIdForWorkspace = useMemo( + () => workspaceSessions[0]?.sessionId ?? '', + [workspaceSessions] + ); + const listWorkspacePath = useMemo(() => { + return hideTargetFields + ? (targetWorkspacePath ?? '').trim() + : draft.workspacePath.trim(); + }, [draft.workspacePath, hideTargetFields, targetWorkspacePath]); + const listSessionId = useMemo(() => { + if (listScope !== 'session') { + return ''; + } + + return hideTargetFields + ? (targetSessionId ?? '').trim() + : draft.sessionId.trim(); + }, [draft.sessionId, hideTargetFields, listScope, targetSessionId]); + + const displayTargetLabel = hideTargetFields + ? assistantName || t('nav.scheduledJobs.assistantFallback') + : null; + + const sortedJobs = useMemo(() => { + return [...jobs].sort((left, right) => { + if (left.enabled !== right.enabled) { + return left.enabled ? -1 : 1; + } + + const configUpdatedAtDiff = right.configUpdatedAtMs - left.configUpdatedAtMs; + if (configUpdatedAtDiff !== 0) { + return configUpdatedAtDiff; + } + + const createdAtDiff = right.createdAtMs - left.createdAtMs; + if (createdAtDiff !== 0) { + return createdAtDiff; + } + + return left.id.localeCompare(right.id); + }); + }, [jobs]); + + const selectedJob = useMemo( + () => sortedJobs.find((job) => job.id === selectedJobId) ?? null, + [selectedJobId, sortedJobs] + ); + + const loadJobs = useCallback(async () => { + if (!isOpen) { + return; + } + + const request = + listScope === 'workspace' + ? { workspacePath: listWorkspacePath || undefined } + : { + workspacePath: listWorkspacePath || undefined, + sessionId: listSessionId || undefined, + }; + + setLoading(true); + try { + const result = await cronAPI.listJobs(request); + setJobs(result); + setSelectedJobId((current) => { + if (current && result.some((job) => job.id === current)) { + return current; + } + return result[0]?.id ?? null; + }); + } catch (error) { + log.error('Failed to load scheduled jobs', { error }); + notificationService.error( + t('nav.scheduledJobs.messages.loadFailed', { + error: error instanceof Error ? error.message : String(error), + }) + ); + } finally { + setLoading(false); + } + }, [isOpen, listScope, listSessionId, listWorkspacePath, t]); + + useEffect(() => { + if (!isOpen) { + return; + } + + const initialWorkspacePath = targetWorkspacePath ?? ''; + const initialSessionId = targetSessionId ?? ''; + setDraft((current) => { + const nextDraft = current.name || current.text || current.workspacePath || current.sessionId + ? current + : createEmptyDraft(initialWorkspacePath, initialSessionId); + const desiredWorkspacePath = hideTargetFields + ? initialWorkspacePath + : current.workspacePath || initialWorkspacePath; + const desiredSessionId = hideTargetFields + ? initialSessionId || nextDraft.sessionId + : current.sessionId || initialSessionId; + + if ( + nextDraft.workspacePath === desiredWorkspacePath + && nextDraft.sessionId === desiredSessionId + ) { + return nextDraft; + } + + return { + ...nextDraft, + workspacePath: desiredWorkspacePath, + sessionId: desiredSessionId, + }; + }); + }, [hideTargetFields, isOpen, targetSessionId, targetWorkspacePath]); + + useEffect(() => { + if (!isOpen) { + return; + } + + setDraft((current) => { + if (hideTargetFields) { + const nextSessionId = targetSessionId || current.sessionId || defaultSessionIdForWorkspace; + if ( + current.workspacePath === (targetWorkspacePath ?? '') + && current.sessionId === nextSessionId + ) { + return current; + } + + return { + ...current, + workspacePath: targetWorkspacePath ?? '', + sessionId: nextSessionId, + }; + } + + if (current.workspacePath && current.sessionId && workspaceSessions.some((session) => session.sessionId === current.sessionId)) { + return current; + } + + if (!current.workspacePath) { + return current; + } + + return { + ...current, + sessionId: current.sessionId || defaultSessionIdForWorkspace, + }; + }); + }, [ + defaultSessionIdForWorkspace, + hideTargetFields, + isOpen, + targetSessionId, + targetWorkspacePath, + workspaceSessions, + ]); + + useEffect(() => { + if (!isOpen) { + setJobs([]); + setSelectedJobId(null); + setDraft(createEmptyDraft(targetWorkspacePath ?? '', targetSessionId ?? '')); + return; + } + + void loadJobs(); + }, [isOpen, loadJobs, targetSessionId, targetWorkspacePath]); + + useEffect(() => { + if (!selectedJob) { + return; + } + + setDraft(jobToDraft(selectedJob)); + }, [selectedJob]); + + const handleCreateDraft = useCallback(() => { + setSelectedJobId(null); + setDraft(createEmptyDraft(draft.workspacePath, draft.sessionId || defaultSessionIdForWorkspace)); + }, [defaultSessionIdForWorkspace, draft.sessionId, draft.workspacePath]); + + const handleSelectJob = useCallback((job: CronJob) => { + setSelectedJobId(job.id); + }, []); + + const handleDeleteJob = useCallback(async (job: CronJob) => { + const confirmed = await confirmDanger( + t('nav.scheduledJobs.deleteDialog.title', { name: job.name }), + null + ); + if (!confirmed) { + return; + } + + try { + await cronAPI.deleteJob(job.id); + notificationService.success(t('nav.scheduledJobs.messages.deleteSuccess')); + if (selectedJobId === job.id) { + setSelectedJobId(null); + } + await loadJobs(); + } catch (error) { + log.error('Failed to delete scheduled job', { jobId: job.id, error }); + notificationService.error( + t('nav.scheduledJobs.messages.deleteFailed', { + error: error instanceof Error ? error.message : String(error), + }) + ); + } + }, [loadJobs, selectedJobId, t]); + + const handleToggleEnabled = useCallback(async (job: CronJob, enabled: boolean) => { + try { + await cronAPI.updateJob(job.id, { enabled }); + await loadJobs(); + } catch (error) { + log.error('Failed to toggle scheduled job', { jobId: job.id, error }); + notificationService.error( + t('nav.scheduledJobs.messages.updateFailed', { + error: error instanceof Error ? error.message : String(error), + }) + ); + } + }, [loadJobs, t]); + + const handleSave = useCallback(async () => { + const validationError = validateDraft(draft, t); + if (validationError) { + notificationService.error(validationError); + return; + } + + let schedule: CronSchedule; + try { + schedule = buildScheduleFromDraft(draft); + } catch (error) { + notificationService.error(error instanceof Error ? error.message : String(error)); + return; + } + + setSaving(true); + try { + if (selectedJobId) { + const request: UpdateCronJobRequest = { + name: draft.name.trim(), + payload: { + text: draft.text.trim(), + }, + enabled: draft.enabled, + schedule, + workspacePath: draft.workspacePath.trim(), + sessionId: draft.sessionId.trim(), + }; + const updatedJob = await cronAPI.updateJob(selectedJobId, request); + setSelectedJobId(updatedJob.id); + setDraft(jobToDraft(updatedJob)); + notificationService.success(t('nav.scheduledJobs.messages.updateSuccess')); + } else { + const request: CreateCronJobRequest = { + name: draft.name.trim(), + payload: { + text: draft.text.trim(), + }, + enabled: draft.enabled, + schedule, + workspacePath: draft.workspacePath.trim(), + sessionId: draft.sessionId.trim(), + }; + const createdJob = await cronAPI.createJob(request); + setSelectedJobId(createdJob.id); + setDraft(jobToDraft(createdJob)); + notificationService.success(t('nav.scheduledJobs.messages.createSuccess')); + } + + await loadJobs(); + } catch (error) { + log.error('Failed to save scheduled job', { error }); + notificationService.error( + t('nav.scheduledJobs.messages.saveFailed', { + error: error instanceof Error ? error.message : String(error), + }) + ); + } finally { + setSaving(false); + } + }, [draft, loadJobs, selectedJobId, t]); + + const sessionOptions = useMemo(() => { + return workspaceSessions.map((session) => ({ + value: session.sessionId, + label: resolveSessionLabel(session), + description: session.title || session.sessionId, + })); + }, [workspaceSessions]); + + const canCreateJob = Boolean(draft.workspacePath.trim() && draft.sessionId.trim()); + + return ( + {displayTargetLabel} + ) : undefined} + size="xlarge" + contentInset={false} + > +
+
+
+
+
+ + {t('nav.scheduledJobs.listTitle')} +
+
+
+ + +
+
+ + {loading ? ( +
+ {t('nav.scheduledJobs.loading')} +
+ ) : sortedJobs.length === 0 ? ( +
+
+ {t('nav.scheduledJobs.empty.title')} +
+
+ {t('nav.scheduledJobs.empty.description')} +
+
+ ) : ( +
+ {sortedJobs.map((job) => { + const isSelected = selectedJobId === job.id; + return ( +
handleSelectJob(job)} + role="button" + tabIndex={0} + onKeyDown={(event) => { + if (event.key === 'Enter' || event.key === ' ') { + event.preventDefault(); + handleSelectJob(job); + } + }} + > +
+
+ {job.name} + + {formatStatusLabel(job, t)} + +
+
+ {formatScheduleSummary(job.schedule, t)} +
+
+ {t('nav.scheduledJobs.nextRunLabel')}: {formatTimestamp(getNextExecutionAtMs(job), t)} +
+ {job.state.lastError ? ( +
+ {job.state.lastError} +
+ ) : null} +
+
+ { + event.stopPropagation(); + void handleToggleEnabled(job, event.currentTarget.checked); + }} + aria-label={t('nav.scheduledJobs.actions.toggleEnabled')} + /> + +
+
+ ); + })} +
+ )} +
+ +
+
+
+
+ {selectedJobId + ? t('nav.scheduledJobs.editor.editTitle') + : t('nav.scheduledJobs.editor.createTitle')} +
+
+
+ +
+
+ + {!hideTargetFields ? ( +
+ { + const workspacePath = event.currentTarget.value; + setDraft((current) => ({ ...current, workspacePath })); + }} + placeholder={t('nav.scheduledJobs.placeholders.workspacePath')} + /> + { + const name = event.currentTarget.value; + setDraft((current) => ({ ...current, name })); + }} + placeholder={t('nav.scheduledJobs.placeholders.name')} + /> + +
+
+ {t('nav.scheduledJobs.fields.enabled')} +
+ { + const enabled = event.currentTarget.checked; + setDraft((current) => ({ ...current, enabled })); + }} + /> +
+ + { + const at = event.currentTarget.value; + setDraft((current) => ({ ...current, at })); + }} + /> + ) : null} + + {draft.scheduleKind === 'every' ? ( + <> + { + const everyMinutes = event.currentTarget.value; + setDraft((current) => ({ ...current, everyMinutes })); + }} + placeholder="60" + /> + { + const anchorMs = event.currentTarget.value; + setDraft((current) => ({ ...current, anchorMs })); + }} + placeholder={t('nav.scheduledJobs.placeholders.anchorMs')} + /> + + ) : null} + + {draft.scheduleKind === 'cron' ? ( + <> + { + const expr = event.currentTarget.value; + setDraft((current) => ({ ...current, expr })); + }} + placeholder="0 8 * * *" + /> + { + const tz = event.currentTarget.value; + setDraft((current) => ({ ...current, tz })); + }} + placeholder={t('nav.scheduledJobs.placeholders.timezone')} + /> + + ) : null} +
+ +