diff --git a/agent_core/core/impl/action/router.py b/agent_core/core/impl/action/router.py index 975766ca..12f1fef9 100644 --- a/agent_core/core/impl/action/router.py +++ b/agent_core/core/impl/action/router.py @@ -16,6 +16,7 @@ from agent_core.core.protocols.context import ContextEngineProtocol from agent_core.core.protocols.llm import LLMInterfaceProtocol from agent_core.core.impl.llm import LLMCallType +from agent_core.core.impl.llm.errors import LLMConsecutiveFailureError from agent_core.core.prompts import ( SELECT_ACTION_PROMPT, SELECT_ACTION_IN_TASK_PROMPT, @@ -620,6 +621,9 @@ async def _prompt_for_decision( f"{raw_response} | error={feedback_error}" ) current_prompt = self._augment_prompt_with_feedback(prompt, attempt + 1, raw_response, feedback_error) + except LLMConsecutiveFailureError: + # Fatal: LLM is in a broken state - re-raise immediately, do not retry + raise except RuntimeError as e: # LLM provider error (empty response, API error, auth failure, etc.) error_msg = str(e) @@ -633,8 +637,8 @@ async def _prompt_for_decision( raise last_error # Otherwise, retry with more context in the prompt current_prompt = self._augment_prompt_with_feedback( - prompt, attempt + 1, - f"[LLM ERROR] {error_msg}", + prompt, attempt + 1, + f"[LLM ERROR] {error_msg}", "LLM provider failed - retrying" ) except Exception as e: diff --git a/agent_core/core/impl/event_stream/__init__.py b/agent_core/core/impl/event_stream/__init__.py index bb2175d9..527b8c21 100644 --- a/agent_core/core/impl/event_stream/__init__.py +++ b/agent_core/core/impl/event_stream/__init__.py @@ -9,10 +9,12 @@ # Re-export data classes from existing location from agent_core.core.event_stream.event import Event, EventRecord +# Token utilities (canonical location: agent_core.utils.token) +from agent_core.utils.token import count_tokens + # Implementation classes from agent_core.core.impl.event_stream.event_stream import ( EventStream, - count_tokens, get_cached_token_count, SEVERITIES, MAX_EVENT_INLINE_CHARS, diff --git a/agent_core/core/impl/event_stream/event_stream.py b/agent_core/core/impl/event_stream/event_stream.py index 7603da49..b9e00d17 100644 --- a/agent_core/core/impl/event_stream/event_stream.py +++ b/agent_core/core/impl/event_stream/event_stream.py @@ -26,38 +26,12 @@ from sklearn.feature_extraction.text import TfidfVectorizer from agent_core.utils.logger import logger from agent_core.decorators import profiler, OperationCategory +from agent_core.utils.token import count_tokens import threading -import tiktoken -# Ensure tiktoken extension encodings (cl100k_base, etc.) are registered. -# Required for tiktoken >= 0.12 and PyInstaller frozen builds. -try: - import tiktoken_ext.openai_public # noqa: F401 -except ImportError: - pass SEVERITIES = ("DEBUG", "INFO", "WARN", "ERROR") MAX_EVENT_INLINE_CHARS = 200000 -# Token counting utility -_tokenizer = None - -def _get_tokenizer(): - """Get or create the tiktoken tokenizer (cached for performance).""" - global _tokenizer - if _tokenizer is None: - try: - _tokenizer = tiktoken.get_encoding("cl100k_base") - except Exception: - # Fallback: use o200k_base if cl100k_base is unavailable - _tokenizer = tiktoken.get_encoding("o200k_base") - return _tokenizer - -def count_tokens(text: str) -> int: - """Count the number of tokens in a text string using tiktoken.""" - if not text: - return 0 - return len(_get_tokenizer().encode(text)) - def get_cached_token_count(rec: "EventRecord") -> int: """Get token count for an EventRecord, using cached value if available. @@ -281,6 +255,16 @@ def summarize_by_LLM(self) -> None: ) try: + # Skip LLM call if the LLM is already in a consecutive failure state + max_failures = getattr(self.llm, "_max_consecutive_failures", 5) + current_failures = getattr(self.llm, "consecutive_failures", 0) + if current_failures >= max_failures: + logger.warning( + f"[EventStream] Skipping LLM summarization: LLM has {current_failures} " + f"consecutive failures (max={max_failures}). Falling back to prune." + ) + raise RuntimeError("LLM in consecutive failure state, skip summarization") + logger.info(f"[EventStream] Running synchronous summarization ({self._total_tokens} tokens)") llm_output = self.llm.generate_response(user_prompt=prompt) new_summary = (llm_output or "").strip() diff --git a/agent_core/core/impl/llm/interface.py b/agent_core/core/impl/llm/interface.py index a1911824..81cb10b5 100644 --- a/agent_core/core/impl/llm/interface.py +++ b/agent_core/core/impl/llm/interface.py @@ -103,7 +103,7 @@ def __init__( api_key: Optional[str] = None, base_url: Optional[str] = None, temperature: float = 0.0, - max_tokens: int = 8000, + max_tokens: int = 50000, deferred: bool = False, get_token_count: Optional[GetTokenCountHook] = None, set_token_count: Optional[SetTokenCountHook] = None, @@ -160,6 +160,8 @@ def __init__( self.byteplus_base_url: Optional[str] = None # Store system prompts for lazy session creation (instance variable) self._session_system_prompts: Dict[str, str] = {} + # Anthropic multi-turn session message history for KV cache accumulation + self._anthropic_session_messages: Dict[str, List[dict]] = {} if ctx["byteplus"]: self.api_key = ctx["byteplus"]["api_key"] @@ -242,11 +244,13 @@ def reinitialize( base_url=self.byteplus_base_url, model=self.model, ) - # Reset session system prompts + # Reset session system prompts and Anthropic message history self._session_system_prompts = {} + self._anthropic_session_messages = {} else: self._byteplus_cache_manager = None self._session_system_prompts = {} + self._anthropic_session_messages = {} # Reinitialize Gemini cache manager if self._gemini_client: @@ -518,9 +522,10 @@ def end_session_cache(self, task_id: str, call_type: str) -> None: task_id: The task ID. call_type: Type of LLM call (use LLMCallType enum values). """ - # Clean up stored system prompt + # Clean up stored system prompt and Anthropic message history session_key = f"{task_id}:{call_type}" system_prompt = self._session_system_prompts.pop(session_key, None) + self._anthropic_session_messages.pop(session_key, None) # Clean up provider-specific caches if self.provider == "byteplus" and self._byteplus_cache_manager: @@ -548,6 +553,11 @@ def end_all_session_caches(self, task_id: str) -> None: if call_type: prompts_and_types.append((system_prompt, call_type)) + # Clean up Anthropic multi-turn message history + anthropic_keys = [k for k in self._anthropic_session_messages if k.startswith(f"{task_id}:")] + for key in anthropic_keys: + self._anthropic_session_messages.pop(key, None) + # Clean up provider-specific caches if self.provider == "byteplus" and self._byteplus_cache_manager: self._byteplus_cache_manager.end_all_sessions_for_task(task_id) @@ -682,9 +692,8 @@ def _generate_response_with_session_sync( logger.info(f"[LLM RECV] {cleaned}") return cleaned - # Handle Anthropic with call_type-based extended TTL caching + # Handle Anthropic with multi-turn KV caching if self.provider == "anthropic" and self._anthropic_client: - # Get stored system prompt or use provided one session_key = f"{task_id}:{call_type}" stored_system_prompt = self._session_system_prompts.get(session_key) effective_system_prompt = system_prompt_for_new_session or stored_system_prompt @@ -694,8 +703,68 @@ def _generate_response_with_session_sync( f"No system prompt for task {task_id}:{call_type}" ) - # Use Anthropic with call_type for extended 1-hour TTL caching - response = self._generate_anthropic(effective_system_prompt, user_prompt, call_type=call_type) + # Get or initialize multi-turn message history + if session_key not in self._anthropic_session_messages: + self._anthropic_session_messages[session_key] = [] + + history = self._anthropic_session_messages[session_key] + + # Build messages: history (with cache_control on last assistant) + new user msg + messages: List[dict] = [] + + # Copy history messages (strip old cache_control, we'll re-place it) + for msg in history: + msg_copy = {"role": msg["role"]} + content = msg["content"] + if isinstance(content, list): + # Strip cache_control from content blocks + msg_copy["content"] = [ + {k: v for k, v in block.items() if k != "cache_control"} + for block in content + ] + else: + msg_copy["content"] = content + messages.append(msg_copy) + + # Place cache_control on the LAST assistant message for prefix caching + if messages: + cache_control = {"type": "ephemeral"} + if call_type: + cache_control["ttl"] = "1h" + for i in range(len(messages) - 1, -1, -1): + if messages[i]["role"] == "assistant": + content = messages[i]["content"] + if isinstance(content, str): + messages[i]["content"] = [ + {"type": "text", "text": content, "cache_control": cache_control} + ] + elif isinstance(content, list): + # Add cache_control to the last text block + for j in range(len(content) - 1, -1, -1): + if content[j].get("type") == "text": + content[j]["cache_control"] = cache_control + break + break + + # Append the new user message + messages.append({"role": "user", "content": user_prompt}) + + logger.debug( + f"[ANTHROPIC SESSION] {session_key}: {len(history)} history msgs, " + f"sending {len(messages)} total msgs" + ) + + # Call Anthropic with the full multi-turn messages + response = self._generate_anthropic( + effective_system_prompt, user_prompt, call_type=call_type, messages=messages + ) + + # On success, accumulate the user message + assistant response in history + assistant_content = response.get("content", "") + if assistant_content and not response.get("error"): + history.append({"role": "user", "content": user_prompt}) + history.append({"role": "assistant", "content": assistant_content}) + cleaned = re.sub(self._CODE_BLOCK_RE, "", response.get("content", "").strip()) current_count = self._get_token_count() self._set_token_count(current_count + response.get("tokens_used", 0)) @@ -1570,13 +1639,19 @@ def _generate_byteplus_standard( @profile("llm_anthropic_call", OperationCategory.LLM) def _generate_anthropic( - self, system_prompt: str | None, user_prompt: str, call_type: Optional[str] = None + self, system_prompt: str | None, user_prompt: str, + call_type: Optional[str] = None, + messages: Optional[List[dict]] = None, ) -> Dict[str, Any]: """Generate response using Anthropic with prompt caching. Anthropic's prompt caching uses `cache_control` markers on content blocks. When the system prompt is long enough (≥1024 tokens), we enable caching. + For multi-turn sessions, pass pre-built `messages` with cache_control on the + last assistant message. This enables prefix caching of the entire conversation + history, not just the system prompt. + TTL Options: - Default (5 minutes): Free, uses "ephemeral" type - Extended (1 hour): When call_type is provided, uses extended TTL for better @@ -1588,6 +1663,8 @@ def _generate_anthropic( user_prompt: The user prompt for this request. call_type: Optional call type (e.g., "reasoning", "action_selection"). When provided, uses extended 1-hour TTL for better cache hit rates. + messages: Optional pre-built messages list for multi-turn sessions. + When provided, used instead of building a single-turn message. Cache hits are logged when `cache_read_input_tokens` > 0 in the response. """ @@ -1604,11 +1681,12 @@ def _generate_anthropic( if not self._anthropic_client: raise RuntimeError("Anthropic client was not initialised.") - # Build the message - rely on system prompt for JSON formatting + # Build the message - use pre-built messages for multi-turn, or single-turn + # Anthropic requires max_tokens; use 16384 (Claude 4 default) to avoid truncation message_kwargs: Dict[str, Any] = { "model": self.model, - "max_tokens": self.max_tokens, - "messages": [ + "max_tokens": 16384, + "messages": messages if messages is not None else [ {"role": "user", "content": user_prompt}, ], } diff --git a/agent_core/utils/__init__.py b/agent_core/utils/__init__.py index c7b73bf4..6e719e6f 100644 --- a/agent_core/utils/__init__.py +++ b/agent_core/utils/__init__.py @@ -2,5 +2,6 @@ """Utility modules for agent-core.""" from agent_core.utils.logger import logger, define_log_level, configure_logging +from agent_core.utils.token import count_tokens -__all__ = ["logger", "define_log_level", "configure_logging"] +__all__ = ["logger", "define_log_level", "configure_logging", "count_tokens"] diff --git a/agent_core/utils/token.py b/agent_core/utils/token.py new file mode 100644 index 00000000..6522f956 --- /dev/null +++ b/agent_core/utils/token.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +""" +Token counting utilities using tiktoken. + +Provides a cached tokenizer and token counting functions used +across agent_core and app layers. +""" + +import tiktoken + +# Ensure tiktoken extension encodings (cl100k_base, etc.) are registered. +# Required for tiktoken >= 0.12 and PyInstaller frozen builds. +try: + import tiktoken_ext.openai_public # noqa: F401 +except ImportError: + pass + +_tokenizer = None + + +def _get_tokenizer(): + """Get or create the tiktoken tokenizer (cached for performance).""" + global _tokenizer + if _tokenizer is None: + try: + _tokenizer = tiktoken.get_encoding("cl100k_base") + except Exception: + # Fallback: use o200k_base if cl100k_base is unavailable + _tokenizer = tiktoken.get_encoding("o200k_base") + return _tokenizer + + +def count_tokens(text: str) -> int: + """Count the number of tokens in a text string using tiktoken.""" + if not text: + return 0 + return len(_get_tokenizer().encode(text)) diff --git a/agent_file_system/AGENT.md b/agent_file_system/AGENT.md index e86df1a5..426f8b5d 100644 --- a/agent_file_system/AGENT.md +++ b/agent_file_system/AGENT.md @@ -144,46 +144,6 @@ When you learn something useful (user preferences, project context, solutions to - Use `stream_edit` to update USER.md with user preferences you discover - Use `stream_edit` to update AGENT.md with operational improvements -## Long Task - Research Note Caching - -Your event stream summarizes older events to stay within token limits. This means detailed findings from earlier in a long task can be lost. To prevent this, cache your research by writing to files. - -When to use: -- Any task involving extended research, investigation, or many action cycles -- When you're accumulating findings you'll need to reference later - -How: -1. Create `workspace/research_.md` early in the task -2. Append findings as you go (every few action cycles) -3. Re-read the file when you need earlier findings that may no longer be in your event stream -4. Delete the file at task end if the findings are no longer needed, or keep it if they may be useful later - -Think of files as your external memory - your event stream is your short-term memory (limited), files are your long-term memory (permanent). - -## Missions - Multi-Session Task Management - -A "mission" is an ongoing effort that spans multiple tasks across your lifetime. Missions solve the problem of losing context between separate task sessions. - -Convention: -- Create a folder: `workspace/missions//` -- Create an INDEX.md inside following the template at `app/data/agent_file_system_template/MISSION_INDEX_TEMPLATE.md`. Read the template first, then fill in the sections for your mission. -- Store all related notes and artifacts in the mission folder - -Mission discovery (at the start of every complex task): -- Check `workspace/missions/` for existing missions -- If the current task relates to an existing mission, read its INDEX.md and work within it -- If the user says "this is part of mission X" or "continue mission X", link to that mission -- Create a new mission when: user requests it, task spans multiple sessions, or task continues previous work - -At the end of a mission-linked task: -- Update INDEX.md with what was accomplished, current status, and next steps -- This is what enables the next task that picks up the mission to have full context - -Missions vs MEMORY.md: -- MEMORY.md = "what I've learned" (permanent agent-wide knowledge: user prefs, patterns, references) -- Missions = "what I'm working on" (active project state: research data, findings, status) -- At mission completion, distill key learnings into MEMORY.md - ## Proactive Behavior You activate on schedules (hourly/daily/weekly/monthly). diff --git a/agent_file_system/MISSION_INDEX_TEMPLATE.md b/agent_file_system/MISSION_INDEX_TEMPLATE.md new file mode 100644 index 00000000..40010b34 --- /dev/null +++ b/agent_file_system/MISSION_INDEX_TEMPLATE.md @@ -0,0 +1,52 @@ +# [Mission Name] + +[One-line description of what this mission aims to achieve] + +## Goal + +[Clear statement of the mission objective. What does "done" look like?] + +- [Specific goal or deliverable 1] +- [Specific goal or deliverable 2] +- [Specific goal or deliverable 3] + +## Status + +**Current phase**: [Not started / In progress / Blocked / Completed / Abandoned] +**Last updated**: [YYYY-MM-DD] +**Last task summary**: [One-line summary of what the most recent task accomplished] + +## Key Findings + +[Summarized discoveries, results, and important information gathered so far. This section is the most critical - it's what future tasks read to restore context.] + +- [Finding 1] +- [Finding 2] + +## What's Been Tried + +[Approaches attempted, including what worked and what didn't. Prevents future tasks from repeating failed approaches.] + +- [Approach 1]: [outcome] +- [Approach 2]: [outcome] + +## Next Steps + +[Concrete actions for the next task to pick up. Be specific enough that a fresh task session can start working immediately.] + +1. [Next action 1] +2. [Next action 2] + +## Resources & References + +[External links, file paths, tools, or contacts relevant to this mission] + +- [Resource 1] +- [Resource 2] + +## Constraints & Notes + +[Any limitations, deadlines, user preferences, or important context that affects how work should be done] + +- [Constraint or note 1] +- [Constraint or note 2] diff --git a/app/agent_base.py b/app/agent_base.py index c2222cd4..5ff0e7d5 100644 --- a/app/agent_base.py +++ b/app/agent_base.py @@ -1200,7 +1200,16 @@ async def _handle_react_error( user_message = classify_llm_error(error) # Fatal LLM errors must not re-queue the task - that causes infinite retry loops - is_fatal_llm_error = isinstance(error, LLMConsecutiveFailureError) + # Walk the full exception chain (__cause__, __context__) to detect wrapped errors + is_fatal_llm_error = False + exc: BaseException | None = error + while exc is not None: + if isinstance(exc, LLMConsecutiveFailureError): + is_fatal_llm_error = True + break + exc = exc.__cause__ or exc.__context__ + if exc is error: # prevent infinite loop on circular chains + break try: logger.debug("[REACT ERROR] Logging to event stream") diff --git a/app/config.py b/app/config.py index 9bf11943..99612567 100644 --- a/app/config.py +++ b/app/config.py @@ -45,6 +45,8 @@ def _get_default_settings() -> Dict[str, Any]: "vlm_provider": "anthropic", "llm_model": None, "vlm_model": None, + "slow_mode": False, + "slow_mode_tpm_limit": 30000, }, "api_keys": { "openai": "", @@ -187,6 +189,18 @@ def reload_settings() -> Dict[str, Any]: return get_settings(reload=True) +def is_slow_mode_enabled() -> bool: + """Check if slow mode (rate limiting) is enabled.""" + settings = get_settings() + return settings.get("model", {}).get("slow_mode", False) + + +def get_slow_mode_tpm_limit() -> int: + """Get the tokens-per-minute limit for slow mode.""" + settings = get_settings() + return settings.get("model", {}).get("slow_mode_tpm_limit", 30000) + + def save_settings(settings: Dict[str, Any]) -> None: """Save settings to settings.json. diff --git a/app/config/settings.json b/app/config/settings.json index 93816e96..bfe2db6b 100644 --- a/app/config/settings.json +++ b/app/config/settings.json @@ -10,10 +10,12 @@ "enabled": true }, "model": { - "llm_provider": "deepseek", - "vlm_provider": "deepseek", - "llm_model": "DeepSeek-V3.2. ", - "vlm_model": "DeepSeek-V3.2. " + "llm_provider": "anthropic", + "vlm_provider": "anthropic", + "llm_model": "claude-sonnet-4-5", + "vlm_model": "claude-sonnet-4-5", + "slow_mode": true, + "slow_mode_tpm_limit": 25000 }, "api_keys": { "openai": "", diff --git a/app/llm_interface.py b/app/llm_interface.py index efeefc01..02427c5a 100644 --- a/app/llm_interface.py +++ b/app/llm_interface.py @@ -807,6 +807,8 @@ def __init__( self._byteplus_cache_manager: Optional[BytePlusCacheManager] = None # Store system prompts for lazy session creation (instance variable) self._session_system_prompts: Dict[str, str] = {} + # Anthropic multi-turn session message history for KV cache accumulation + self._anthropic_session_messages: Dict[str, List[dict]] = {} if ctx["byteplus"]: self.api_key = ctx["byteplus"]["api_key"] @@ -879,11 +881,13 @@ def reinitialize( base_url=self.byteplus_base_url, model=self.model, ) - # Reset session system prompts + # Reset session system prompts and Anthropic message history self._session_system_prompts = {} + self._anthropic_session_messages = {} else: self._byteplus_cache_manager = None self._session_system_prompts = {} + self._anthropic_session_messages = {} # Reinitialize Gemini cache manager if self._gemini_client: @@ -917,6 +921,15 @@ def _generate_response_sync( if log_response: logger.info(f"[LLM SEND] system={system_prompt} | user={user_prompt}") + # Slow mode: throttle before making the API call + from app.config import is_slow_mode_enabled + _slow_mode_active = is_slow_mode_enabled() + if _slow_mode_active: + from agent_core.utils.token import count_tokens + from app.rate_limiter import get_rate_limiter + estimated = count_tokens(system_prompt or "") + count_tokens(user_prompt) + get_rate_limiter().wait_if_needed(estimated) + if self.provider == "openai": response = self._generate_openai(system_prompt, user_prompt) elif self.provider == "remote": @@ -932,7 +945,13 @@ def _generate_response_sync( cleaned = re.sub(self._CODE_BLOCK_RE, "", response.get("content", "").strip()) - STATE.set_agent_property("token_count", STATE.get_agent_property("token_count", 0) + response.get("tokens_used", 0)) + tokens_used = response.get("tokens_used", 0) + STATE.set_agent_property("token_count", STATE.get_agent_property("token_count", 0) + tokens_used) + + if _slow_mode_active and tokens_used > 0: + from app.rate_limiter import get_rate_limiter + get_rate_limiter().record_usage(tokens_used) + if log_response: logger.info(f"[LLM RECV] {cleaned}") return cleaned @@ -1026,9 +1045,10 @@ def end_session_cache(self, task_id: str, call_type: str) -> None: task_id: The task ID. call_type: Type of LLM call (use LLMCallType enum values). """ - # Clean up stored system prompt + # Clean up stored system prompt and Anthropic message history session_key = f"{task_id}:{call_type}" system_prompt = self._session_system_prompts.pop(session_key, None) + self._anthropic_session_messages.pop(session_key, None) # Clean up provider-specific caches if self.provider == "byteplus" and self._byteplus_cache_manager: @@ -1056,6 +1076,11 @@ def end_all_session_caches(self, task_id: str) -> None: if call_type: prompts_and_types.append((system_prompt, call_type)) + # Clean up Anthropic multi-turn message history + anthropic_keys = [k for k in self._anthropic_session_messages if k.startswith(f"{task_id}:")] + for key in anthropic_keys: + self._anthropic_session_messages.pop(key, None) + # Clean up provider-specific caches if self.provider == "byteplus" and self._byteplus_cache_manager: self._byteplus_cache_manager.end_all_sessions_for_task(task_id) @@ -1166,6 +1191,15 @@ def _generate_response_with_session_sync( if log_response: logger.info(f"[LLM SESSION] task={task_id} call_type={call_type} | user={user_prompt}") + # Slow mode: throttle before making the API call + from app.config import is_slow_mode_enabled + _slow_mode_active = is_slow_mode_enabled() + if _slow_mode_active: + from agent_core.utils.token import count_tokens + from app.rate_limiter import get_rate_limiter + estimated = count_tokens(user_prompt) + get_rate_limiter().wait_if_needed(estimated) + # Handle Gemini with explicit caching (per call_type) if self.provider == "gemini" and self._gemini_cache_manager: # Get stored system prompt or use provided one @@ -1181,10 +1215,14 @@ def _generate_response_with_session_sync( # Use Gemini with explicit caching (call_type passed for cache keying) response = self._generate_gemini(effective_system_prompt, user_prompt, call_type=call_type) cleaned = re.sub(self._CODE_BLOCK_RE, "", response.get("content", "").strip()) + _tokens_used = response.get("tokens_used", 0) STATE.set_agent_property( "token_count", - STATE.get_agent_property("token_count", 0) + response.get("tokens_used", 0) + STATE.get_agent_property("token_count", 0) + _tokens_used ) + if _slow_mode_active and _tokens_used > 0: + from app.rate_limiter import get_rate_limiter + get_rate_limiter().record_usage(_tokens_used) if log_response: logger.info(f"[LLM RECV] {cleaned}") return cleaned @@ -1204,17 +1242,20 @@ def _generate_response_with_session_sync( # Use OpenAI with call_type for better cache routing via prompt_cache_key response = self._generate_openai(effective_system_prompt, user_prompt, call_type=call_type) cleaned = re.sub(self._CODE_BLOCK_RE, "", response.get("content", "").strip()) + _tokens_used = response.get("tokens_used", 0) STATE.set_agent_property( "token_count", - STATE.get_agent_property("token_count", 0) + response.get("tokens_used", 0) + STATE.get_agent_property("token_count", 0) + _tokens_used ) + if _slow_mode_active and _tokens_used > 0: + from app.rate_limiter import get_rate_limiter + get_rate_limiter().record_usage(_tokens_used) if log_response: logger.info(f"[LLM RECV] {cleaned}") return cleaned - # Handle Anthropic with call_type-based extended TTL caching + # Handle Anthropic with multi-turn KV caching if self.provider == "anthropic" and self._anthropic_client: - # Get stored system prompt or use provided one session_key = f"{task_id}:{call_type}" stored_system_prompt = self._session_system_prompts.get(session_key) effective_system_prompt = system_prompt_for_new_session or stored_system_prompt @@ -1224,13 +1265,79 @@ def _generate_response_with_session_sync( f"No system prompt for task {task_id}:{call_type}" ) - # Use Anthropic with call_type for extended 1-hour TTL caching - response = self._generate_anthropic(effective_system_prompt, user_prompt, call_type=call_type) + # Get or initialize multi-turn message history + if session_key not in self._anthropic_session_messages: + self._anthropic_session_messages[session_key] = [] + + history = self._anthropic_session_messages[session_key] + + # Build messages: history (with cache_control on last assistant) + new user msg + messages: List[dict] = [] + + # Copy history messages (strip old cache_control, we'll re-place it) + for msg in history: + msg_copy = {"role": msg["role"]} + content = msg["content"] + if isinstance(content, list): + # Strip cache_control from content blocks + msg_copy["content"] = [ + {k: v for k, v in block.items() if k != "cache_control"} + for block in content + ] + else: + msg_copy["content"] = content + messages.append(msg_copy) + + # Place cache_control on the LAST assistant message for prefix caching + if messages: + cache_control = {"type": "ephemeral"} + if call_type: + cache_control["ttl"] = "1h" + for i in range(len(messages) - 1, -1, -1): + if messages[i]["role"] == "assistant": + content = messages[i]["content"] + if isinstance(content, str): + messages[i]["content"] = [ + {"type": "text", "text": content, "cache_control": cache_control} + ] + elif isinstance(content, list): + # Add cache_control to the last text block + for j in range(len(content) - 1, -1, -1): + if content[j].get("type") == "text": + content[j]["cache_control"] = cache_control + break + break + + # Append the new user message + messages.append({"role": "user", "content": user_prompt}) + + logger.debug( + f"[ANTHROPIC SESSION] {session_key}: {len(history)} history msgs, " + f"sending {len(messages)} total msgs" + ) + + # Call Anthropic with the full multi-turn messages + # Note: _generate_anthropic adds JSON prefill as the last message automatically + response = self._generate_anthropic( + effective_system_prompt, user_prompt, call_type=call_type, messages=messages + ) + + # On success, accumulate user message + assistant response in history + # The response content already has '{' prepended from JSON prefill + assistant_content = response.get("content", "") + if assistant_content and "error" not in response: + history.append({"role": "user", "content": user_prompt}) + history.append({"role": "assistant", "content": assistant_content}) + cleaned = re.sub(self._CODE_BLOCK_RE, "", response.get("content", "").strip()) + _tokens_used = response.get("tokens_used", 0) STATE.set_agent_property( "token_count", - STATE.get_agent_property("token_count", 0) + response.get("tokens_used", 0) + STATE.get_agent_property("token_count", 0) + _tokens_used ) + if _slow_mode_active and _tokens_used > 0: + from app.rate_limiter import get_rate_limiter + get_rate_limiter().record_usage(_tokens_used) if log_response: logger.info(f"[LLM RECV] {cleaned}") return cleaned @@ -1311,10 +1418,14 @@ def _generate_response_with_session_sync( cleaned = re.sub(self._CODE_BLOCK_RE, "", response.get("content", "").strip()) + _tokens_used = response.get("tokens_used", 0) STATE.set_agent_property( "token_count", - STATE.get_agent_property("token_count", 0) + response.get("tokens_used", 0) + STATE.get_agent_property("token_count", 0) + _tokens_used ) + if _slow_mode_active and _tokens_used > 0: + from app.rate_limiter import get_rate_limiter + get_rate_limiter().record_usage(_tokens_used) if log_response: logger.info(f"[LLM RECV] {cleaned}") return cleaned @@ -2050,13 +2161,19 @@ def _generate_byteplus_standard( @profile("llm_anthropic_call", OperationCategory.LLM) def _generate_anthropic( - self, system_prompt: str | None, user_prompt: str, call_type: Optional[str] = None + self, system_prompt: str | None, user_prompt: str, + call_type: Optional[str] = None, + messages: Optional[List[dict]] = None, ) -> Dict[str, Any]: """Generate response using Anthropic with prompt caching. Anthropic's prompt caching uses `cache_control` markers on content blocks. When the system prompt is long enough (≥1024 tokens), we enable caching. + For multi-turn sessions, pass pre-built `messages` with cache_control on the + last assistant message. This enables prefix caching of the entire conversation + history, not just the system prompt. + TTL Options: - Default (5 minutes): Free, uses "ephemeral" type - Extended (1 hour): When call_type is provided, uses extended TTL for better @@ -2068,6 +2185,9 @@ def _generate_anthropic( user_prompt: The user prompt for this request. call_type: Optional call type (e.g., "reasoning", "action_selection"). When provided, uses extended 1-hour TTL for better cache hit rates. + messages: Optional pre-built messages list for multi-turn sessions. + When provided, used instead of building a single-turn message. + JSON prefill is added automatically when applicable. Cache hits are logged when `cache_read_input_tokens` > 0 in the response. """ @@ -2087,16 +2207,20 @@ def _generate_anthropic( # Always enable JSON mode via prefilling json_mode = True - # Build the message with optional system prompt - messages = [{"role": "user", "content": user_prompt}] - # For JSON mode, use prefilling to force JSON output + # Build the message list: use pre-built messages for multi-turn, or single-turn + if messages is not None: + api_messages = list(messages) # Copy to avoid mutating caller's list + else: + api_messages = [{"role": "user", "content": user_prompt}] + # For JSON mode, use prefilling to force JSON output (always last message) if json_mode: - messages.append({"role": "assistant", "content": "{"}) + api_messages.append({"role": "assistant", "content": "{"}) + # Anthropic requires max_tokens; use 16384 (Claude 4 default) to avoid truncation message_kwargs: Dict[str, Any] = { "model": self.model, - "max_tokens": self.max_tokens, - "messages": messages, + "max_tokens": 16384, + "messages": api_messages, } if system_prompt: diff --git a/app/rate_limiter.py b/app/rate_limiter.py new file mode 100644 index 00000000..a234d230 --- /dev/null +++ b/app/rate_limiter.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +""" +Sliding-window token rate limiter for Slow Mode. + +When Slow Mode is enabled, this module throttles LLM calls to stay +within a configurable tokens-per-minute (TPM) limit. +""" + +import logging +import time +import threading +from collections import deque +from typing import Tuple + +logger = logging.getLogger(__name__) + + +class TokenRateLimiter: + """Sliding-window rate limiter for tokens per minute.""" + + def __init__(self): + self._window: deque[Tuple[float, int]] = deque() # (timestamp, token_count) + self._lock = threading.Lock() + + def _get_tpm_limit(self) -> int: + """Read TPM limit from settings (single source of truth).""" + from app.config import get_slow_mode_tpm_limit + return get_slow_mode_tpm_limit() + + def _prune_window(self): + """Remove entries older than 60 seconds.""" + cutoff = time.monotonic() - 60.0 + while self._window and self._window[0][0] < cutoff: + self._window.popleft() + + def tokens_used_in_window(self) -> int: + """Return total tokens consumed in the current 60-second window.""" + with self._lock: + self._prune_window() + return sum(t for _, t in self._window) + + def wait_if_needed(self, estimated_tokens: int = 0) -> float: + """Block until there is capacity for estimated_tokens. + + Returns the number of seconds waited. + """ + tpm_limit = self._get_tpm_limit() + waited = 0.0 + with self._lock: + while True: + self._prune_window() + used = sum(t for _, t in self._window) + if used + estimated_tokens <= tpm_limit: + break + if not self._window: + break + # Wait until the oldest entry expires from the window + oldest_ts = self._window[0][0] + wait_time = oldest_ts + 60.0 - time.monotonic() + 0.1 + if wait_time <= 0: + continue + # Release lock while sleeping so other threads aren't blocked + self._lock.release() + logger.info( + f"[SLOW MODE] Rate limit approaching ({used}/{tpm_limit} TPM). " + f"Waiting {wait_time:.1f}s..." + ) + time.sleep(wait_time) + waited += wait_time + self._lock.acquire() + return waited + + def record_usage(self, tokens: int): + """Record that tokens were consumed just now.""" + if tokens > 0: + with self._lock: + self._window.append((time.monotonic(), tokens)) + + def reset(self): + """Clear the sliding window.""" + with self._lock: + self._window.clear() + + +# Module-level singleton +_rate_limiter = TokenRateLimiter() + + +def get_rate_limiter() -> TokenRateLimiter: + """Return the global rate limiter instance.""" + return _rate_limiter diff --git a/app/ui_layer/adapters/browser_adapter.py b/app/ui_layer/adapters/browser_adapter.py index 1b7dff24..ee6288f7 100644 --- a/app/ui_layer/adapters/browser_adapter.py +++ b/app/ui_layer/adapters/browser_adapter.py @@ -1234,6 +1234,12 @@ async def _handle_ws_message(self, data: Dict[str, Any]) -> None: base_url = data.get("baseUrl") await self._handle_ollama_models_get(base_url) + elif msg_type == "slow_mode_get": + await self._handle_slow_mode_get() + + elif msg_type == "slow_mode_set": + await self._handle_slow_mode_set(data) + # MCP settings operations elif msg_type == "mcp_list": await self._handle_mcp_list() @@ -2837,6 +2843,36 @@ async def _handle_ollama_models_get(self, base_url: Optional[str] = None) -> Non "data": {"success": False, "models": [], "error": str(e)}, }) + # ───────────────────────────────────────────────────────────────────── + # Slow Mode Handlers + # ───────────────────────────────────────────────────────────────────── + + async def _handle_slow_mode_get(self) -> None: + """Get slow mode settings.""" + try: + from app.ui_layer.settings.model_settings import get_slow_mode_settings + result = get_slow_mode_settings() + await self._broadcast({"type": "slow_mode_get", "data": result}) + except Exception as e: + await self._broadcast({ + "type": "slow_mode_get", + "data": {"success": False, "error": str(e)}, + }) + + async def _handle_slow_mode_set(self, data: Dict[str, Any]) -> None: + """Set slow mode on or off.""" + try: + from app.ui_layer.settings.model_settings import set_slow_mode + enabled = data.get("enabled", False) + tpm_limit = data.get("tpmLimit") + result = set_slow_mode(enabled, tpm_limit) + await self._broadcast({"type": "slow_mode_set", "data": result}) + except Exception as e: + await self._broadcast({ + "type": "slow_mode_set", + "data": {"success": False, "error": str(e)}, + }) + # ───────────────────────────────────────────────────────────────────── # MCP Settings Handlers # ───────────────────────────────────────────────────────────────────── diff --git a/app/ui_layer/browser/frontend/src/pages/Settings/ModelSettings.tsx b/app/ui_layer/browser/frontend/src/pages/Settings/ModelSettings.tsx index 177a6727..561dfed4 100644 --- a/app/ui_layer/browser/frontend/src/pages/Settings/ModelSettings.tsx +++ b/app/ui_layer/browser/frontend/src/pages/Settings/ModelSettings.tsx @@ -61,6 +61,10 @@ export function ModelSettings() { const [newLlmModel, setNewLlmModel] = useState('') const [newVlmModel, setNewVlmModel] = useState('') + // Slow mode state + const [slowModeEnabled, setSlowModeEnabled] = useState(false) + const [isLoadingSlowMode, setIsLoadingSlowMode] = useState(true) + // UI state const [isSaving, setIsSaving] = useState(false) const [isTesting, setIsTesting] = useState(false) @@ -207,6 +211,22 @@ export function ModelSettings() { showToast('error', d.error || 'Model download failed') } }), + onMessage('slow_mode_get', (data: unknown) => { + const d = data as { success: boolean; enabled: boolean; tpm_limit: number } + setIsLoadingSlowMode(false) + if (d.success) { + setSlowModeEnabled(d.enabled) + } + }), + onMessage('slow_mode_set', (data: unknown) => { + const d = data as { success: boolean; enabled: boolean; error?: string } + if (d.success) { + setSlowModeEnabled(d.enabled) + showToast('success', `Slow mode ${d.enabled ? 'enabled' : 'disabled'}`) + } else { + showToast('error', d.error || 'Failed to update slow mode') + } + }), ] return () => cleanups.forEach(cleanup => cleanup()) @@ -218,6 +238,7 @@ export function ModelSettings() { send('model_providers_get') send('model_settings_get') + send('slow_mode_get') }, [isConnected, send]) // Fetch Ollama models whenever the active provider is 'remote' @@ -501,7 +522,7 @@ export function ModelSettings() { )} {/* Actions */} -
+
+ + {/* Slow Mode */} +
+
+
+ Slow Mode + + Limits token usage to stay within API rate limits. + Enable this if you experience rate limiting errors from your provider. + +
+ { + setSlowModeEnabled(e.target.checked) + send('slow_mode_set', { enabled: e.target.checked }) + }} + disabled={isLoadingSlowMode} + /> +
)} diff --git a/app/ui_layer/settings/model_settings.py b/app/ui_layer/settings/model_settings.py index 20d9fdf2..9cae78c6 100644 --- a/app/ui_layer/settings/model_settings.py +++ b/app/ui_layer/settings/model_settings.py @@ -484,3 +484,41 @@ def validate_can_save( "can_save": False, "errors": [str(e)], } + + +# ───────────────────────────────────────────────────────────────────── +# Slow Mode Settings +# ───────────────────────────────────────────────────────────────────── + +def get_slow_mode_settings() -> Dict[str, Any]: + """Get slow mode settings.""" + settings = _load_settings() + model = settings.get("model", {}) + return { + "success": True, + "enabled": model.get("slow_mode", False), + "tpm_limit": model.get("slow_mode_tpm_limit", 30000), + } + + +def set_slow_mode(enabled: bool, tpm_limit: Optional[int] = None) -> Dict[str, Any]: + """Set slow mode on or off, optionally updating the TPM limit.""" + settings = _load_settings() + if "model" not in settings: + settings["model"] = {} + settings["model"]["slow_mode"] = enabled + if tpm_limit is not None: + settings["model"]["slow_mode_tpm_limit"] = max(1000, tpm_limit) + + if _save_settings(settings): + from app.config import reload_settings + reload_settings() + # Reset the rate limiter window on setting change + from app.rate_limiter import get_rate_limiter + get_rate_limiter().reset() + return { + "success": True, + "enabled": enabled, + "tpm_limit": settings["model"].get("slow_mode_tpm_limit", 30000), + } + return {"success": False, "error": "Failed to save settings"}