Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions agent_core/core/impl/action/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from agent_core.core.protocols.context import ContextEngineProtocol
from agent_core.core.protocols.llm import LLMInterfaceProtocol
from agent_core.core.impl.llm import LLMCallType
from agent_core.core.impl.llm.errors import LLMConsecutiveFailureError
from agent_core.core.prompts import (
SELECT_ACTION_PROMPT,
SELECT_ACTION_IN_TASK_PROMPT,
Expand Down Expand Up @@ -620,6 +621,9 @@ async def _prompt_for_decision(
f"{raw_response} | error={feedback_error}"
)
current_prompt = self._augment_prompt_with_feedback(prompt, attempt + 1, raw_response, feedback_error)
except LLMConsecutiveFailureError:
# Fatal: LLM is in a broken state - re-raise immediately, do not retry
raise
except RuntimeError as e:
# LLM provider error (empty response, API error, auth failure, etc.)
error_msg = str(e)
Expand All @@ -633,8 +637,8 @@ async def _prompt_for_decision(
raise last_error
# Otherwise, retry with more context in the prompt
current_prompt = self._augment_prompt_with_feedback(
prompt, attempt + 1,
f"[LLM ERROR] {error_msg}",
prompt, attempt + 1,
f"[LLM ERROR] {error_msg}",
"LLM provider failed - retrying"
)
except Exception as e:
Expand Down
4 changes: 3 additions & 1 deletion agent_core/core/impl/event_stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
# Re-export data classes from existing location
from agent_core.core.event_stream.event import Event, EventRecord

# Token utilities (canonical location: agent_core.utils.token)
from agent_core.utils.token import count_tokens

# Implementation classes
from agent_core.core.impl.event_stream.event_stream import (
EventStream,
count_tokens,
get_cached_token_count,
SEVERITIES,
MAX_EVENT_INLINE_CHARS,
Expand Down
38 changes: 11 additions & 27 deletions agent_core/core/impl/event_stream/event_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,12 @@
from sklearn.feature_extraction.text import TfidfVectorizer
from agent_core.utils.logger import logger
from agent_core.decorators import profiler, OperationCategory
from agent_core.utils.token import count_tokens
import threading
import tiktoken
# Ensure tiktoken extension encodings (cl100k_base, etc.) are registered.
# Required for tiktoken >= 0.12 and PyInstaller frozen builds.
try:
import tiktoken_ext.openai_public # noqa: F401
except ImportError:
pass

SEVERITIES = ("DEBUG", "INFO", "WARN", "ERROR")
MAX_EVENT_INLINE_CHARS = 200000

# Token counting utility
_tokenizer = None

def _get_tokenizer():
"""Get or create the tiktoken tokenizer (cached for performance)."""
global _tokenizer
if _tokenizer is None:
try:
_tokenizer = tiktoken.get_encoding("cl100k_base")
except Exception:
# Fallback: use o200k_base if cl100k_base is unavailable
_tokenizer = tiktoken.get_encoding("o200k_base")
return _tokenizer

def count_tokens(text: str) -> int:
"""Count the number of tokens in a text string using tiktoken."""
if not text:
return 0
return len(_get_tokenizer().encode(text))


def get_cached_token_count(rec: "EventRecord") -> int:
"""Get token count for an EventRecord, using cached value if available.
Expand Down Expand Up @@ -281,6 +255,16 @@ def summarize_by_LLM(self) -> None:
)

try:
# Skip LLM call if the LLM is already in a consecutive failure state
max_failures = getattr(self.llm, "_max_consecutive_failures", 5)
current_failures = getattr(self.llm, "consecutive_failures", 0)
if current_failures >= max_failures:
logger.warning(
f"[EventStream] Skipping LLM summarization: LLM has {current_failures} "
f"consecutive failures (max={max_failures}). Falling back to prune."
)
raise RuntimeError("LLM in consecutive failure state, skip summarization")

logger.info(f"[EventStream] Running synchronous summarization ({self._total_tokens} tokens)")
llm_output = self.llm.generate_response(user_prompt=prompt)
new_summary = (llm_output or "").strip()
Expand Down
100 changes: 89 additions & 11 deletions agent_core/core/impl/llm/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def __init__(
api_key: Optional[str] = None,
base_url: Optional[str] = None,
temperature: float = 0.0,
max_tokens: int = 8000,
max_tokens: int = 50000,
deferred: bool = False,
get_token_count: Optional[GetTokenCountHook] = None,
set_token_count: Optional[SetTokenCountHook] = None,
Expand Down Expand Up @@ -160,6 +160,8 @@ def __init__(
self.byteplus_base_url: Optional[str] = None
# Store system prompts for lazy session creation (instance variable)
self._session_system_prompts: Dict[str, str] = {}
# Anthropic multi-turn session message history for KV cache accumulation
self._anthropic_session_messages: Dict[str, List[dict]] = {}

if ctx["byteplus"]:
self.api_key = ctx["byteplus"]["api_key"]
Expand Down Expand Up @@ -242,11 +244,13 @@ def reinitialize(
base_url=self.byteplus_base_url,
model=self.model,
)
# Reset session system prompts
# Reset session system prompts and Anthropic message history
self._session_system_prompts = {}
self._anthropic_session_messages = {}
else:
self._byteplus_cache_manager = None
self._session_system_prompts = {}
self._anthropic_session_messages = {}

# Reinitialize Gemini cache manager
if self._gemini_client:
Expand Down Expand Up @@ -518,9 +522,10 @@ def end_session_cache(self, task_id: str, call_type: str) -> None:
task_id: The task ID.
call_type: Type of LLM call (use LLMCallType enum values).
"""
# Clean up stored system prompt
# Clean up stored system prompt and Anthropic message history
session_key = f"{task_id}:{call_type}"
system_prompt = self._session_system_prompts.pop(session_key, None)
self._anthropic_session_messages.pop(session_key, None)

# Clean up provider-specific caches
if self.provider == "byteplus" and self._byteplus_cache_manager:
Expand Down Expand Up @@ -548,6 +553,11 @@ def end_all_session_caches(self, task_id: str) -> None:
if call_type:
prompts_and_types.append((system_prompt, call_type))

# Clean up Anthropic multi-turn message history
anthropic_keys = [k for k in self._anthropic_session_messages if k.startswith(f"{task_id}:")]
for key in anthropic_keys:
self._anthropic_session_messages.pop(key, None)

# Clean up provider-specific caches
if self.provider == "byteplus" and self._byteplus_cache_manager:
self._byteplus_cache_manager.end_all_sessions_for_task(task_id)
Expand Down Expand Up @@ -682,9 +692,8 @@ def _generate_response_with_session_sync(
logger.info(f"[LLM RECV] {cleaned}")
return cleaned

# Handle Anthropic with call_type-based extended TTL caching
# Handle Anthropic with multi-turn KV caching
if self.provider == "anthropic" and self._anthropic_client:
# Get stored system prompt or use provided one
session_key = f"{task_id}:{call_type}"
stored_system_prompt = self._session_system_prompts.get(session_key)
effective_system_prompt = system_prompt_for_new_session or stored_system_prompt
Expand All @@ -694,8 +703,68 @@ def _generate_response_with_session_sync(
f"No system prompt for task {task_id}:{call_type}"
)

# Use Anthropic with call_type for extended 1-hour TTL caching
response = self._generate_anthropic(effective_system_prompt, user_prompt, call_type=call_type)
# Get or initialize multi-turn message history
if session_key not in self._anthropic_session_messages:
self._anthropic_session_messages[session_key] = []

history = self._anthropic_session_messages[session_key]

# Build messages: history (with cache_control on last assistant) + new user msg
messages: List[dict] = []

# Copy history messages (strip old cache_control, we'll re-place it)
for msg in history:
msg_copy = {"role": msg["role"]}
content = msg["content"]
if isinstance(content, list):
# Strip cache_control from content blocks
msg_copy["content"] = [
{k: v for k, v in block.items() if k != "cache_control"}
for block in content
]
else:
msg_copy["content"] = content
messages.append(msg_copy)

# Place cache_control on the LAST assistant message for prefix caching
if messages:
cache_control = {"type": "ephemeral"}
if call_type:
cache_control["ttl"] = "1h"
for i in range(len(messages) - 1, -1, -1):
if messages[i]["role"] == "assistant":
content = messages[i]["content"]
if isinstance(content, str):
messages[i]["content"] = [
{"type": "text", "text": content, "cache_control": cache_control}
]
elif isinstance(content, list):
# Add cache_control to the last text block
for j in range(len(content) - 1, -1, -1):
if content[j].get("type") == "text":
content[j]["cache_control"] = cache_control
break
break

# Append the new user message
messages.append({"role": "user", "content": user_prompt})

logger.debug(
f"[ANTHROPIC SESSION] {session_key}: {len(history)} history msgs, "
f"sending {len(messages)} total msgs"
)

# Call Anthropic with the full multi-turn messages
response = self._generate_anthropic(
effective_system_prompt, user_prompt, call_type=call_type, messages=messages
)

# On success, accumulate the user message + assistant response in history
assistant_content = response.get("content", "")
if assistant_content and not response.get("error"):
history.append({"role": "user", "content": user_prompt})
history.append({"role": "assistant", "content": assistant_content})

cleaned = re.sub(self._CODE_BLOCK_RE, "", response.get("content", "").strip())
current_count = self._get_token_count()
self._set_token_count(current_count + response.get("tokens_used", 0))
Expand Down Expand Up @@ -1570,13 +1639,19 @@ def _generate_byteplus_standard(

@profile("llm_anthropic_call", OperationCategory.LLM)
def _generate_anthropic(
self, system_prompt: str | None, user_prompt: str, call_type: Optional[str] = None
self, system_prompt: str | None, user_prompt: str,
call_type: Optional[str] = None,
messages: Optional[List[dict]] = None,
) -> Dict[str, Any]:
"""Generate response using Anthropic with prompt caching.

Anthropic's prompt caching uses `cache_control` markers on content blocks.
When the system prompt is long enough (≥1024 tokens), we enable caching.

For multi-turn sessions, pass pre-built `messages` with cache_control on the
last assistant message. This enables prefix caching of the entire conversation
history, not just the system prompt.

TTL Options:
- Default (5 minutes): Free, uses "ephemeral" type
- Extended (1 hour): When call_type is provided, uses extended TTL for better
Expand All @@ -1588,6 +1663,8 @@ def _generate_anthropic(
user_prompt: The user prompt for this request.
call_type: Optional call type (e.g., "reasoning", "action_selection").
When provided, uses extended 1-hour TTL for better cache hit rates.
messages: Optional pre-built messages list for multi-turn sessions.
When provided, used instead of building a single-turn message.

Cache hits are logged when `cache_read_input_tokens` > 0 in the response.
"""
Expand All @@ -1604,11 +1681,12 @@ def _generate_anthropic(
if not self._anthropic_client:
raise RuntimeError("Anthropic client was not initialised.")

# Build the message - rely on system prompt for JSON formatting
# Build the message - use pre-built messages for multi-turn, or single-turn
# Anthropic requires max_tokens; use 16384 (Claude 4 default) to avoid truncation
message_kwargs: Dict[str, Any] = {
"model": self.model,
"max_tokens": self.max_tokens,
"messages": [
"max_tokens": 16384,
"messages": messages if messages is not None else [
{"role": "user", "content": user_prompt},
],
}
Expand Down
3 changes: 2 additions & 1 deletion agent_core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"""Utility modules for agent-core."""

from agent_core.utils.logger import logger, define_log_level, configure_logging
from agent_core.utils.token import count_tokens

__all__ = ["logger", "define_log_level", "configure_logging"]
__all__ = ["logger", "define_log_level", "configure_logging", "count_tokens"]
37 changes: 37 additions & 0 deletions agent_core/utils/token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
"""
Token counting utilities using tiktoken.

Provides a cached tokenizer and token counting functions used
across agent_core and app layers.
"""

import tiktoken

# Ensure tiktoken extension encodings (cl100k_base, etc.) are registered.
# Required for tiktoken >= 0.12 and PyInstaller frozen builds.
try:
import tiktoken_ext.openai_public # noqa: F401
except ImportError:
pass

_tokenizer = None


def _get_tokenizer():
"""Get or create the tiktoken tokenizer (cached for performance)."""
global _tokenizer
if _tokenizer is None:
try:
_tokenizer = tiktoken.get_encoding("cl100k_base")
except Exception:
# Fallback: use o200k_base if cl100k_base is unavailable
_tokenizer = tiktoken.get_encoding("o200k_base")
return _tokenizer


def count_tokens(text: str) -> int:
"""Count the number of tokens in a text string using tiktoken."""
if not text:
return 0
return len(_get_tokenizer().encode(text))
40 changes: 0 additions & 40 deletions agent_file_system/AGENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,46 +144,6 @@ When you learn something useful (user preferences, project context, solutions to
- Use `stream_edit` to update USER.md with user preferences you discover
- Use `stream_edit` to update AGENT.md with operational improvements

## Long Task - Research Note Caching

Your event stream summarizes older events to stay within token limits. This means detailed findings from earlier in a long task can be lost. To prevent this, cache your research by writing to files.

When to use:
- Any task involving extended research, investigation, or many action cycles
- When you're accumulating findings you'll need to reference later

How:
1. Create `workspace/research_<topic>.md` early in the task
2. Append findings as you go (every few action cycles)
3. Re-read the file when you need earlier findings that may no longer be in your event stream
4. Delete the file at task end if the findings are no longer needed, or keep it if they may be useful later

Think of files as your external memory - your event stream is your short-term memory (limited), files are your long-term memory (permanent).

## Missions - Multi-Session Task Management

A "mission" is an ongoing effort that spans multiple tasks across your lifetime. Missions solve the problem of losing context between separate task sessions.

Convention:
- Create a folder: `workspace/missions/<descriptive_name>/`
- Create an INDEX.md inside following the template at `app/data/agent_file_system_template/MISSION_INDEX_TEMPLATE.md`. Read the template first, then fill in the sections for your mission.
- Store all related notes and artifacts in the mission folder

Mission discovery (at the start of every complex task):
- Check `workspace/missions/` for existing missions
- If the current task relates to an existing mission, read its INDEX.md and work within it
- If the user says "this is part of mission X" or "continue mission X", link to that mission
- Create a new mission when: user requests it, task spans multiple sessions, or task continues previous work

At the end of a mission-linked task:
- Update INDEX.md with what was accomplished, current status, and next steps
- This is what enables the next task that picks up the mission to have full context

Missions vs MEMORY.md:
- MEMORY.md = "what I've learned" (permanent agent-wide knowledge: user prefs, patterns, references)
- Missions = "what I'm working on" (active project state: research data, findings, status)
- At mission completion, distill key learnings into MEMORY.md

## Proactive Behavior

You activate on schedules (hourly/daily/weekly/monthly).
Expand Down
Loading