Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions openviking/client/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ async def add_message(

If both content and parts are provided, parts takes precedence.
"""
from datetime import datetime
from datetime import datetime, timezone

from openviking.message.part import Part, TextPart, part_from_dict

Expand All @@ -458,15 +458,8 @@ async def add_message(
else:
raise ValueError("Either content or parts must be provided")

# 解析 created_at
msg_created_at = None
if created_at:
try:
msg_created_at = datetime.fromisoformat(created_at)
except ValueError:
pass

session.add_message(role, message_parts, created_at=msg_created_at)
# created_at 直接传递给 session (毫秒时间戳)
session.add_message(role, message_parts, created_at=created_at)
return {
"session_id": session_id,
"message_count": len(session.messages),
Expand Down
13 changes: 6 additions & 7 deletions openviking/message/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Message:
id: str
role: Literal["user", "assistant"]
parts: List[Part]
created_at: datetime = None
created_at: str = None

@property
def content(self) -> str:
Expand Down Expand Up @@ -64,13 +64,12 @@ def estimated_tokens(self) -> int:

def to_dict(self) -> dict:
"""Serialize to JSONL."""
created_at_val = self.created_at or datetime.now(timezone.utc)
created_at_str = format_iso8601(created_at_val)
created_at_val = self.created_at or datetime.now(timezone.utc).isoformat()
return {
"id": self.id,
"role": self.role,
"parts": [self._part_to_dict(p) for p in self.parts],
"created_at": created_at_str,
"created_at": created_at_val,
}

def _part_to_dict(self, part: Part) -> dict:
Expand Down Expand Up @@ -139,7 +138,7 @@ def from_dict(cls, data: dict) -> "Message":
id=data["id"],
role=data["role"],
parts=parts,
created_at=parse_iso_datetime(data["created_at"]),
created_at=data["created_at"],
)

@classmethod
Expand All @@ -151,7 +150,7 @@ def create_user(cls, content: str, msg_id: str = None) -> "Message":
id=msg_id or f"msg_{uuid4().hex}",
role="user",
parts=[TextPart(text=content)],
created_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc).isoformat(),
)

@classmethod
Expand Down Expand Up @@ -194,7 +193,7 @@ def create_assistant(
id=msg_id or f"msg_{uuid4().hex}",
role="assistant",
parts=parts,
created_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc).isoformat()
)

def get_context_parts(self) -> List[ContextPart]:
Expand Down
4 changes: 3 additions & 1 deletion openviking/models/vlm/backends/litellm_vlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ def _call() -> Union[str, VLMResponse]:
response = completion(**kwargs)
elapsed = time.perf_counter() - t0
self._update_token_usage_from_response(response, duration_seconds=elapsed)
tracer.info(f'response={response}')
if tools:
return self._build_vlm_response(response, has_tools=True)
return self._clean_response(self._extract_content_from_response(response))
Expand All @@ -333,7 +334,7 @@ def _call() -> Union[str, VLMResponse]:
operation_name="LiteLLM VLM completion",
)

@tracer("vlm.call", ignore_result=False, ignore_args=["messages"])
@tracer("litellm.vlm.call", ignore_result=True, ignore_args=["messages"])
async def get_completion_async(
self,
prompt: str = "",
Expand All @@ -352,6 +353,7 @@ async def _call() -> Union[str, VLMResponse]:
response = await acompletion(**kwargs)
elapsed = time.perf_counter() - t0
self._update_token_usage_from_response(response, duration_seconds=elapsed)
tracer.info(f'response={response}')
if tools:
return self._build_vlm_response(response, has_tools=True)
return self._clean_response(self._extract_content_from_response(response))
Expand Down
2 changes: 1 addition & 1 deletion openviking/models/vlm/backends/openai_vlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def _call() -> Union[str, VLMResponse]:
operation_name="OpenAI VLM completion",
)

@tracer("vlm.call", ignore_result=True, ignore_args=["messages"])
@tracer("openai.vlm.call", ignore_result=True, ignore_args=["messages"])
async def get_completion_async(
self,
prompt: str = "",
Expand Down
2 changes: 1 addition & 1 deletion openviking/models/vlm/backends/volcengine_vlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def get_completion(
return result
return self._clean_response(str(result))

@tracer("vlm.call")
@tracer("volcengine.vlm.call", ignore_result=True, ignore_args=False)
async def get_completion_async(
self,
prompt: str = "",
Expand Down
12 changes: 6 additions & 6 deletions openviking/prompts/templates/memory/entities.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ memory_type: entities
description: |
Wikipedia article - manages page using Zettelkasten method.
Each page represents an article with relative path links to events.
Cards should be rich and distributed - avoid putting all info in one card.
Entity should be rich and distributed - avoid putting all info in one entity.
directory: "viking://user/{{ user_space }}/memories/entities"
filename_template: "{{ category }}/{{ name }}.md"
enabled: true
Expand All @@ -25,10 +25,10 @@ fields:
description: |
- Detailed Zettelkasten card content in markdown format
Relative path format: events://{event_name} or ../events/{year}/{month}/{day}/{event_name}.md
- Example:
# LGBTQ+ events Caroline participated in:
- [Pride parade](../events/2023/03/05/Pride parade.md)
- [Caroline's school LGBTQ talk](events://Caroline's school LGBTQ talk)
- [Support group](../events/2024/03/10/Support group.md)
- Example:
# Book club events Caroline participated in:
- [Monthly book discussion](../events/2023/03/05/Monthly book discussion.md)
- [Author meetup](events://Author meetup)
- [Summer reading challenge](../events/2024/03/10/Summer reading challenge.md)

merge_op: patch
11 changes: 2 additions & 9 deletions openviking/server/routers/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,8 @@ async def add_message(
else:
parts = [TextPart(text=request.content or "")]

# 解析 created_at
created_at = None
if request.created_at:
try:
created_at = datetime.fromisoformat(request.created_at)
except ValueError:
logger.warning(f"Invalid created_at format: {request.created_at}")

session.add_message(request.role, parts, created_at=created_at)
# created_at 直接传递给 session (ISO string)
session.add_message(request.role, parts, created_at=request.created_at)
return Response(
status="ok",
result={
Expand Down
5 changes: 4 additions & 1 deletion openviking/session/compressor_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from openviking.message import Message
from openviking.server.identity import RequestContext
from openviking.session.memory import ExtractLoop, MemoryUpdater
from openviking.session.memory.utils.json_parser import JsonUtils
from openviking.storage import VikingDBManager
from openviking.storage.viking_fs import get_viking_fs
from openviking.telemetry import get_current_telemetry
Expand Down Expand Up @@ -79,6 +80,7 @@ def _get_or_create_updater(self, registry, transaction_handle=None) -> MemoryUpd
registry=registry, vikingdb=self.vikingdb, transaction_handle=transaction_handle
)

@tracer()
async def extract_long_term_memories(
self,
messages: List[Message],
Expand All @@ -93,6 +95,7 @@ async def extract_long_term_memories(
Note: Returns empty List[Context] because v2 directly writes to storage.
The list length is used for stats in session.py.
"""

if not messages:
return []

Expand All @@ -101,8 +104,8 @@ async def extract_long_term_memories(
return []

tracer.info("Starting v2 memory extraction from conversation")
tracer.info(f"messages={JsonUtils.dumps(messages)}")
config = get_openviking_config()

# Initialize default memory files (soul.md, identity.md) if not exist
from openviking.session.memory.memory_type_registry import create_default_registry

Expand Down
78 changes: 64 additions & 14 deletions openviking/session/memory/extract_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def __init__(
self._read_files: Set[str] = set()
# Transaction handle for file locking
self._transaction_handle = None
# Flag to disable tools in next iteration after unknown tool error
self._disable_tools_for_iteration = False

async def run(self) -> Tuple[Optional[Any], List[Dict[str, Any]]]:
"""
Expand Down Expand Up @@ -151,7 +153,7 @@ async def run(self) -> Tuple[Optional[Any], List[Dict[str, Any]]]:
"role": "system",
"content": f"""
## Output Format
See the complete JSON Schema below:
The final output of the model must strictly follow the JSON Schema format shown below:
```json
{schema_str}
```
Expand All @@ -169,6 +171,19 @@ async def run(self) -> Tuple[Optional[Any], List[Dict[str, Any]]]:
)
messages.extend(tool_call_messages)

# Track prefetched files in _read_files to avoid unnecessary refetch
for msg in tool_call_messages:
if msg.get("role") == "user" and "tool_call_name" in msg.get("content", ""):
import json
try:
content = json.loads(msg.get("content", "{}"))
if content.get("tool_call_name") == "read":
uri = content.get("args", {}).get("uri")
if uri:
self._read_files.add(uri)
except (json.JSONDecodeError, AttributeError):
pass

while iteration < max_iterations:
iteration += 1
tracer.info(f"ReAct iteration {iteration}/{max_iterations}")
Expand All @@ -187,13 +202,21 @@ async def run(self) -> Tuple[Optional[Any], List[Dict[str, Any]]]:

# Call LLM with tools - model decides: tool calls OR final operations
pretty_print_messages(messages)
tool_calls, operations = await self._call_llm(messages, force_final=is_last_iteration)

tool_calls, operations = await self._call_llm(
messages
)

if tool_calls:
await self._execute_tool_calls(messages, tool_calls, tools_used)
has_unknown_tool = await self._execute_tool_calls(messages, tool_calls, tools_used)
# If model called an unknown tool, disable tools in next iteration
if has_unknown_tool:
self._disable_tools_for_iteration = True
tracer.info("Unknown tool called, will disable tools in next iteration")
# Allow one extra iteration for refetch
if iteration >= max_iterations:
max_iterations += 1
self._disable_tools_for_iteration = True
tracer.info(f"Extended max_iterations to {max_iterations} for tool call")
continue

Expand Down Expand Up @@ -222,7 +245,8 @@ async def run(self) -> Tuple[Optional[Any], List[Dict[str, Any]]]:
if is_last_iteration:
final_operations = self._operations_model()
break
# Otherwise continue and try again
# Otherwise disable_tools and try again
self._disable_tools_for_iteration = True
continue

if final_operations is None:
Expand All @@ -236,7 +260,14 @@ async def run(self) -> Tuple[Optional[Any], List[Dict[str, Any]]]:
return final_operations, tools_used

@tracer("extract_loop.execute_tool_calls")
async def _execute_tool_calls(self, messages, tool_calls, tools_used):
async def _execute_tool_calls(self, messages, tool_calls, tools_used) -> bool:
"""
Execute tool calls in parallel.

Returns:
True if any tool call returned "Unknown tool" error, indicating
the model should not receive tools in the next iteration.
"""
# Execute all tool calls in parallel
async def execute_single_tool_call(idx: int, tool_call):
"""Execute a single tool call."""
Expand All @@ -248,8 +279,13 @@ async def execute_single_tool_call(idx: int, tool_call):
]
results = await self._execute_in_parallel(action_tasks)

has_unknown_tool = False

# Process results and add to messages
for _idx, tool_call, result in results:
# Check for unknown tool error
if isinstance(result, dict) and result.get("error", "").startswith("Unknown tool:"):
has_unknown_tool = True
# Skip if arguments is None
if tool_call.arguments is None:
logger.warning(f"Tool call {tool_call.name} has no arguments, skipping")
Expand All @@ -265,7 +301,8 @@ async def execute_single_tool_call(idx: int, tool_call):

# Track read tool calls for refetch detection
if tool_call.name == "read" and tool_call.arguments.get("uri"):
self._read_files.add(tool_call.arguments["uri"])
uri = tool_call.arguments["uri"]
self._read_files.add(uri)

add_tool_call_pair_to_messages(
messages,
Expand All @@ -275,6 +312,8 @@ async def execute_single_tool_call(idx: int, tool_call):
result=result,
)

return has_unknown_tool

def _validate_operations(self, operations: Any) -> None:
"""
Validate that all operations have allowed URIs.
Expand Down Expand Up @@ -308,8 +347,7 @@ def _validate_operations(self, operations: Any) -> None:

async def _call_llm(
self,
messages: List[Dict[str, Any]],
force_final: bool = False,
messages: List[Dict[str, Any]]
) -> Tuple[Optional[List], Optional[Any]]:
"""
Call LLM with tools. Returns either tool calls OR final operations.
Expand All @@ -325,13 +363,17 @@ async def _call_llm(
await self._mark_cache_breakpoint(messages)

# Call LLM with tools - use tools from strategy
tool_choice = "none" if force_final else None

tools = None
tool_choice = None
if not self._disable_tools_for_iteration:
tools = self._tool_schemas
tool_choice = "auto"
response = await self.vlm.get_completion_async(
messages=messages,
tools=self._tool_schemas,
tools=tools,
tool_choice=tool_choice,
)
tracer.info(f"response={response}")
# print(f'response={response}')
# Log cache hit info
if hasattr(response, "usage") and response.usage:
Expand All @@ -352,16 +394,24 @@ async def _call_llm(
f"[KVCache] prompt_tokens={prompt_tokens}, cached_tokens={cached_tokens}"
)

# Case 0: Handle string response (when tools are not provided) or None
if response is None:
content = ""
elif isinstance(response, str):
# When tools=None, VLM returns string instead of VLMResponse
content = response
# Case 1: LLM returned tool calls
if response.has_tool_calls:
elif response.has_tool_calls:
# Format tool calls nicely for debug logging
for tc in response.tool_calls:
tracer.info(f"[assistant tool_call] (id={tc.id}, name={tc.name})")
tracer.info(f" {json.dumps(tc.arguments, indent=2, ensure_ascii=False)}")
return (response.tool_calls, None)
else:
# Case 2: VLMResponse without tool calls - get content from response
content = response.content or ""

# Case 2: Try to parse operations from content with stability
content = response.content or ""
# Parse operations from content
if content:
try:
# print(f'LLM response content: {content}')
Expand Down
Loading
Loading