diff --git a/CHANGELOG.md b/CHANGELOG.md index 2579c1a36..76cbf5da2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Only write entries that are worth mentioning to users. ## Unreleased +- Core: Add `/export` command to export current session context (messages, metadata) to a Markdown file, and `/import` command to import context from a file or another session ID into the current session - Shell: Show token counts (used/total) alongside context usage percentage in the status bar (e.g., `context: 42.0% (4.2k/10.0k)`) - Shell: Rotate keyboard shortcut tips in the toolbar — tips cycle through available shortcuts on each prompt submission to save horizontal space - MCP: Add loading indicators for MCP server connections — Shell displays a "Connecting to MCP servers..." spinner and Web shows a status message while MCP tools are being loaded diff --git a/docs/en/guides/sessions.md b/docs/en/guides/sessions.md index acab88d48..8555bb12e 100644 --- a/docs/en/guides/sessions.md +++ b/docs/en/guides/sessions.md @@ -48,6 +48,39 @@ In addition to conversation history, Kimi Code CLI also automatically saves and This means you don't need to reconfigure these settings each time you resume a session. For example, if you approved auto-execution of certain shell commands in your previous session, those approvals remain in effect after resuming. +## Export and import + +Kimi Code CLI supports exporting session context to a file, or importing context from external files and other sessions. + +**Export a session** + +Enter `/export` to export the current session's complete conversation history as a Markdown file: + +``` +/export +``` + +The exported file includes session metadata, a conversation overview, and the complete conversation organized by turns. You can also specify an output path: + +``` +/export ~/exports/my-session.md +``` + +**Import context** + +Enter `/import` to import context from a file or another session. The imported content is appended as reference information to the current session: + +``` +/import ./previous-session-export.md +/import abc12345 +``` + +Common text-based file formats are supported (Markdown, source code, configuration files, etc.). You can also pass a session ID to import the complete conversation history from that session. + +::: tip +Exported files may contain sensitive information (such as code snippets, file paths, etc.). Please review before sharing. +::: + ## Clear and compact As the conversation progresses, the context grows longer. Kimi Code CLI will automatically compress the context when needed to ensure the conversation can continue. diff --git a/docs/en/reference/slash-commands.md b/docs/en/reference/slash-commands.md index 7f8afbfb8..936e5f96f 100644 --- a/docs/en/reference/slash-commands.md +++ b/docs/en/reference/slash-commands.md @@ -3,7 +3,7 @@ Slash commands are built-in commands for Kimi Code CLI, used to control sessions, configuration, and debugging. Enter a command starting with `/` in the input box to trigger. ::: tip Shell mode -Some slash commands are also available in shell mode, including `/help`, `/exit`, `/version`, `/editor`, `/changelog`, and `/feedback`. +Some slash commands are also available in shell mode, including `/help`, `/exit`, `/version`, `/editor`, `/changelog`, `/feedback`, `/export`, and `/import`. ::: ## Help and info @@ -110,6 +110,29 @@ Alias: `/resume` Use arrow keys to select a session, press `Enter` to confirm switch, press `Ctrl-C` to cancel. +### `/export` + +Export the current session context to a Markdown file for archiving or sharing. + +Usage: + +- `/export`: Export to the current working directory with an auto-generated filename (format: `kimi-export--.md`) +- `/export `: Export to the specified path. If the path is a directory, the filename is auto-generated; if it is a file path, the content is written directly to that file + +The exported file includes: +- Session metadata (session ID, export time, working directory, message count, token count) +- Conversation overview (topic, number of turns, tool call count) +- Complete conversation history organized by turns, including user messages, AI responses, tool calls, and tool results + +### `/import` + +Import context from a file or another session into the current session. The imported content is appended as reference context, and the AI can use this information to inform subsequent interactions. + +Usage: + +- `/import `: Import from a file. Supports common text-based formats such as Markdown, plain text, source code, and configuration files; binary files (e.g., images, PDFs, archives) are not supported +- `/import `: Import from the specified session ID. Cannot import the current session into itself + ### `/clear` Clear the current session's context and start a new conversation. diff --git a/docs/en/release-notes/changelog.md b/docs/en/release-notes/changelog.md index 736559e8c..82c19159e 100644 --- a/docs/en/release-notes/changelog.md +++ b/docs/en/release-notes/changelog.md @@ -4,6 +4,7 @@ This page documents the changes in each Kimi Code CLI release. ## Unreleased +- Core: Add `/export` command to export current session context (messages, metadata) to a Markdown file, and `/import` command to import context from a file or another session ID into the current session - Shell: Show token counts (used/total) alongside context usage percentage in the status bar (e.g., `context: 42.0% (4.2k/10.0k)`) - Shell: Rotate keyboard shortcut tips in the toolbar — tips cycle through available shortcuts on each prompt submission to save horizontal space - MCP: Add loading indicators for MCP server connections — Shell displays a "Connecting to MCP servers..." spinner and Web shows a status message while MCP tools are being loaded diff --git a/docs/zh/guides/sessions.md b/docs/zh/guides/sessions.md index 2e3e397a0..f9e5939a6 100644 --- a/docs/zh/guides/sessions.md +++ b/docs/zh/guides/sessions.md @@ -48,6 +48,39 @@ kimi --session abc123 这意味着你不需要在每次恢复会话时重新配置这些设置。例如,如果你在上次会话中批准了某类 Shell 命令的自动执行,恢复会话后这些批准仍然有效。 +## 导出与导入 + +Kimi Code CLI 支持将会话上下文导出为文件,或从外部文件和其他会话导入上下文。 + +**导出会话** + +输入 `/export` 可以将当前会话的完整对话历史导出为 Markdown 文件: + +``` +/export +``` + +导出文件包含会话元数据、对话概览和按轮次组织的完整对话记录。你也可以指定输出路径: + +``` +/export ~/exports/my-session.md +``` + +**导入上下文** + +输入 `/import` 可以从文件或其他会话导入上下文。导入的内容会作为参考信息附加到当前会话中: + +``` +/import ./previous-session-export.md +/import abc12345 +``` + +支持导入常见的文本格式文件(Markdown、代码、配置文件等)。你也可以传入一个会话 ID,从该会话导入完整的对话历史。 + +::: tip 提示 +导出文件可能包含敏感信息(如代码片段、文件路径等),分享前请注意检查。 +::: + ## 清空与压缩 随着对话的进行,上下文会越来越长。Kimi Code CLI 会在需要的时候自动对上下文进行压缩,确保对话能够继续。 diff --git a/docs/zh/reference/slash-commands.md b/docs/zh/reference/slash-commands.md index d3fe2ad34..6ca670df2 100644 --- a/docs/zh/reference/slash-commands.md +++ b/docs/zh/reference/slash-commands.md @@ -3,7 +3,7 @@ 斜杠命令是 Kimi Code CLI 的内置命令,用于控制会话、配置和调试。在输入框中输入 `/` 开头的命令即可触发。 ::: tip Shell 模式 -部分斜杠命令在 Shell 模式下也可以使用,包括 `/help`、`/exit`、`/version`、`/editor`、`/changelog` 和 `/feedback`。 +部分斜杠命令在 Shell 模式下也可以使用,包括 `/help`、`/exit`、`/version`、`/editor`、`/changelog`、`/feedback`、`/export` 和 `/import`。 ::: ## 帮助与信息 @@ -110,6 +110,29 @@ 使用方向键选择会话,按 `Enter` 确认切换,按 `Ctrl-C` 取消。 +### `/export` + +将当前会话的上下文导出为 Markdown 文件,方便归档或分享。 + +用法: + +- `/export`:导出到当前工作目录,文件名自动生成(格式为 `kimi-export-<会话ID前8位>-<时间戳>.md`) +- `/export `:导出到指定路径。如果路径是目录,文件名会自动生成;如果是文件路径,则直接写入该文件 + +导出文件包含: +- 会话元数据(会话 ID、导出时间、工作目录、消息数、token 数) +- 对话概览(主题、轮次数、工具调用次数) +- 完整的对话历史,按轮次组织,包括用户消息、AI 回复、工具调用和工具结果 + +### `/import` + +从文件或其他会话导入上下文到当前会话。导入的内容会作为参考上下文附加到当前对话中,AI 可以利用这些信息来辅助后续的交互。 + +用法: + +- `/import `:从文件导入。支持 Markdown、文本、代码、配置文件等常见文本格式;不支持二进制文件(如图片、PDF、压缩包) +- `/import `:从指定会话 ID 导入。不能导入当前会话自身 + ### `/clear` 清空当前会话的上下文,开始新的对话。 diff --git a/docs/zh/release-notes/changelog.md b/docs/zh/release-notes/changelog.md index 8bb4ff2aa..c2b5827da 100644 --- a/docs/zh/release-notes/changelog.md +++ b/docs/zh/release-notes/changelog.md @@ -4,6 +4,7 @@ ## 未发布 +- Core:新增 `/export` 命令,支持将当前会话上下文(消息、元数据)导出为 Markdown 文件;新增 `/import` 命令,支持从文件或其他会话 ID 导入上下文到当前会话 - Shell:在状态栏上下文用量旁显示 Token 数量(已用/总量),如 `context: 42.0% (4.2k/10.0k)` - Shell:工具栏快捷键提示改为轮转显示——每次提交后循环展示不同快捷键提示,节省横向空间 - MCP:为 MCP 服务器连接添加加载指示器——Shell 在连接 MCP 服务器时显示 "Connecting to MCP servers..." 加载动画,Web 在 MCP 工具加载期间显示状态消息 diff --git a/src/kimi_cli/soul/compaction.py b/src/kimi_cli/soul/compaction.py index a68033d9b..7742806be 100644 --- a/src/kimi_cli/soul/compaction.py +++ b/src/kimi_cli/soul/compaction.py @@ -35,13 +35,13 @@ def estimated_token_count(self) -> int: """ if self.usage is not None and len(self.messages) > 0: summary_tokens = self.usage.output - preserved_tokens = _estimate_text_tokens(self.messages[1:]) + preserved_tokens = estimate_text_tokens(self.messages[1:]) return summary_tokens + preserved_tokens - return _estimate_text_tokens(self.messages) + return estimate_text_tokens(self.messages) -def _estimate_text_tokens(messages: Sequence[Message]) -> int: +def estimate_text_tokens(messages: Sequence[Message]) -> int: """Estimate tokens from message text content using a character-based heuristic.""" total_chars = 0 for msg in messages: diff --git a/src/kimi_cli/soul/slash.py b/src/kimi_cli/soul/slash.py index c4d63c16e..71c23eb2b 100644 --- a/src/kimi_cli/soul/slash.py +++ b/src/kimi_cli/soul/slash.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import TYPE_CHECKING +from kaos.path import KaosPath from kosong.message import Message from loguru import logger @@ -13,6 +14,8 @@ from kimi_cli.soul.agent import load_agents_md from kimi_cli.soul.context import Context from kimi_cli.soul.message import system +from kimi_cli.utils.export import is_sensitive_file +from kimi_cli.utils.path import sanitize_cli_path, shorten_home from kimi_cli.utils.slashcmd import SlashCommandRegistry from kimi_cli.wire.types import StatusUpdate, TextPart @@ -103,7 +106,7 @@ async def add_dir(soul: KimiSoul, args: str): from kimi_cli.utils.path import is_within_directory, list_directory - args = args.strip() + args = sanitize_cli_path(args) if not args: if not soul.runtime.additional_dirs: wire_send(TextPart(text="No additional directories. Usage: /add-dir ")) @@ -169,3 +172,72 @@ async def add_dir(soul: KimiSoul, args: str): wire_send(TextPart(text=f"Added directory to workspace: {path}")) logger.info("Added additional directory: {path}", path=path) + + +@registry.command +async def export(soul: KimiSoul, args: str): + """Export current session context to a markdown file""" + from kimi_cli.utils.export import perform_export + + session = soul.runtime.session + result = await perform_export( + history=list(soul.context.history), + session_id=session.id, + work_dir=str(session.work_dir), + token_count=soul.context.token_count, + args=args, + default_dir=Path(str(session.work_dir)), + ) + if isinstance(result, str): + wire_send(TextPart(text=result)) + return + output, count = result + display = shorten_home(KaosPath(str(output))) + wire_send(TextPart(text=f"Exported {count} messages to {display}")) + wire_send( + TextPart( + text=" Note: The exported file may contain sensitive information. " + "Please be cautious when sharing it externally." + ) + ) + + +@registry.command(name="import") +async def import_context(soul: KimiSoul, args: str): + """Import context from a file or session ID""" + from kimi_cli.utils.export import perform_import + + target = sanitize_cli_path(args) + if not target: + wire_send(TextPart(text="Usage: /import ")) + return + + session = soul.runtime.session + raw_max_context_size = ( + soul.runtime.llm.max_context_size if soul.runtime.llm is not None else None + ) + max_context_size = ( + raw_max_context_size + if isinstance(raw_max_context_size, int) and raw_max_context_size > 0 + else None + ) + result = await perform_import( + target=target, + current_session_id=session.id, + work_dir=session.work_dir, + context=soul.context, + max_context_size=max_context_size, + ) + if isinstance(result, str): + wire_send(TextPart(text=result)) + return + + source_desc, content_len = result + wire_send(TextPart(text=f"Imported context from {source_desc} ({content_len} chars).")) + if source_desc.startswith("file") and is_sensitive_file(Path(target).name): + wire_send( + TextPart( + text="Warning: This file may contain secrets (API keys, tokens, credentials). " + "The content is now part of your session context." + ) + ) diff --git a/src/kimi_cli/ui/shell/export_import.py b/src/kimi_cli/ui/shell/export_import.py new file mode 100644 index 000000000..a1e28c4e7 --- /dev/null +++ b/src/kimi_cli/ui/shell/export_import.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +from kaos.path import KaosPath + +from kimi_cli.ui.shell.console import console +from kimi_cli.ui.shell.slash import ensure_kimi_soul, registry, shell_mode_registry +from kimi_cli.utils.export import is_sensitive_file +from kimi_cli.utils.path import sanitize_cli_path, shorten_home +from kimi_cli.wire.types import TurnBegin, TurnEnd + +if TYPE_CHECKING: + from kimi_cli.ui.shell import Shell + + +# --------------------------------------------------------------------------- +# /export command +# --------------------------------------------------------------------------- + + +@registry.command +@shell_mode_registry.command +async def export(app: Shell, args: str): + """Export current session context to a markdown file""" + from kimi_cli.utils.export import perform_export + + soul = ensure_kimi_soul(app) + if soul is None: + return + + session = soul.runtime.session + result = await perform_export( + history=list(soul.context.history), + session_id=session.id, + work_dir=str(session.work_dir), + token_count=soul.context.token_count, + args=args, + default_dir=Path(str(session.work_dir)), + ) + if isinstance(result, str): + console.print(f"[yellow]{result}[/yellow]") + return + + output, count = result + display = shorten_home(KaosPath(str(output))) + console.print(f"[green]Exported {count} messages to {display}[/green]") + console.print( + "[yellow]Note: The exported file may contain sensitive information. " + "Please be cautious when sharing it externally.[/yellow]" + ) + + +# --------------------------------------------------------------------------- +# /import command +# --------------------------------------------------------------------------- + + +@registry.command(name="import") +@shell_mode_registry.command(name="import") +async def import_context(app: Shell, args: str): + """Import context from a file or session ID""" + from kimi_cli.utils.export import perform_import + + soul = ensure_kimi_soul(app) + if soul is None: + return + + target = sanitize_cli_path(args) + if not target: + console.print("[yellow]Usage: /import [/yellow]") + return + + session = soul.runtime.session + raw_max_context_size = ( + soul.runtime.llm.max_context_size if soul.runtime.llm is not None else None + ) + max_context_size = ( + raw_max_context_size + if isinstance(raw_max_context_size, int) and raw_max_context_size > 0 + else None + ) + result = await perform_import( + target=target, + current_session_id=session.id, + work_dir=session.work_dir, + context=soul.context, + max_context_size=max_context_size, + ) + if isinstance(result, str): + console.print(f"[red]{result}[/red]") + return + + source_desc, content_len = result + + # Write to wire file so the import appears in session replay + await soul.wire_file.append_message( + TurnBegin(user_input=f"[Imported context from {source_desc}]") + ) + await soul.wire_file.append_message(TurnEnd()) + + console.print( + f"[green]Imported context from {source_desc} " + f"({content_len} chars) into current session.[/green]" + ) + if source_desc.startswith("file") and is_sensitive_file(Path(target).name): + console.print( + "[yellow]Warning: This file may contain secrets (API keys, tokens, credentials). " + "The content is now part of your session context.[/yellow]" + ) diff --git a/src/kimi_cli/ui/shell/oauth.py b/src/kimi_cli/ui/shell/oauth.py index 70ff63a1a..e086b62d5 100644 --- a/src/kimi_cli/ui/shell/oauth.py +++ b/src/kimi_cli/ui/shell/oauth.py @@ -13,19 +13,12 @@ from kimi_cli.soul.kimisoul import KimiSoul from kimi_cli.ui.shell.console import console from kimi_cli.ui.shell.setup import select_platform, setup_platform -from kimi_cli.ui.shell.slash import registry +from kimi_cli.ui.shell.slash import ensure_kimi_soul, registry if TYPE_CHECKING: from kimi_cli.ui.shell import Shell -def _ensure_kimi_soul(app: Shell) -> KimiSoul | None: - if not isinstance(app.soul, KimiSoul): - console.print("[red]KimiSoul required[/red]") - return None - return app.soul - - async def _login_kimi_code(soul: KimiSoul) -> bool: status: Status | None = None ok = True @@ -68,7 +61,7 @@ def _current_model_key(soul: KimiSoul) -> str | None: @registry.command(aliases=["setup"]) async def login(app: Shell, args: str) -> None: """Login or setup a platform.""" - soul = _ensure_kimi_soul(app) + soul = ensure_kimi_soul(app) if soul is None: return platform = await select_platform() @@ -88,7 +81,7 @@ async def login(app: Shell, args: str) -> None: @registry.command async def logout(app: Shell, args: str) -> None: """Logout from the current platform.""" - soul = _ensure_kimi_soul(app) + soul = ensure_kimi_soul(app) if soul is None: return config = soul.runtime.config diff --git a/src/kimi_cli/ui/shell/slash.py b/src/kimi_cli/ui/shell/slash.py index b644a8997..a9c0a4fd8 100644 --- a/src/kimi_cli/ui/shell/slash.py +++ b/src/kimi_cli/ui/shell/slash.py @@ -32,7 +32,7 @@ shell_mode_registry = SlashCommandRegistry[ShellSlashCmdFunc]() -def _ensure_kimi_soul(app: Shell) -> KimiSoul | None: +def ensure_kimi_soul(app: Shell) -> KimiSoul | None: if not isinstance(app.soul, KimiSoul): console.print("[red]KimiSoul required[/red]") return None @@ -142,7 +142,7 @@ async def model(app: Shell, args: str): """Switch LLM model or thinking mode""" from kimi_cli.llm import derive_model_capabilities - soul = _ensure_kimi_soul(app) + soul = ensure_kimi_soul(app) if soul is None: return config = soul.runtime.config @@ -265,7 +265,7 @@ async def editor(app: Shell, args: str): """Set default external editor for Ctrl-O""" from kimi_cli.utils.editor import get_editor_command - soul = _ensure_kimi_soul(app) + soul = ensure_kimi_soul(app) if soul is None: return config = soul.runtime.config @@ -402,7 +402,7 @@ def feedback(app: Shell, args: str): @registry.command(aliases=["reset"]) async def clear(app: Shell, args: str): """Clear the context""" - if _ensure_kimi_soul(app) is None: + if ensure_kimi_soul(app) is None: return await app.run_soul_command("/clear") raise Reload() @@ -411,7 +411,7 @@ async def clear(app: Shell, args: str): @registry.command async def new(app: Shell, args: str): """Start a new session""" - soul = _ensure_kimi_soul(app) + soul = ensure_kimi_soul(app) if soul is None: return current_session = soul.runtime.session @@ -429,7 +429,7 @@ async def new(app: Shell, args: str): @registry.command(name="sessions", aliases=["resume"]) async def list_sessions(app: Shell, args: str): """List sessions and resume optionally""" - soul = _ensure_kimi_soul(app) + soul = ensure_kimi_soul(app) if soul is None: return @@ -473,7 +473,7 @@ async def list_sessions(app: Shell, args: str): @registry.command def web(app: Shell, args: str): """Open Kimi Code Web UI in browser""" - soul = _ensure_kimi_soul(app) + soul = ensure_kimi_soul(app) session_id = soul.runtime.session.id if soul else None raise SwitchToWeb(session_id=session_id) @@ -487,7 +487,7 @@ async def mcp(app: Shell, args: str): from kimi_cli.soul.toolset import KimiToolset from kimi_cli.utils.rich.columns import BulletColumns - soul = _ensure_kimi_soul(app) + soul = ensure_kimi_soul(app) if soul is None: return toolset = soul.agent.toolset @@ -539,6 +539,7 @@ async def mcp(app: Shell, args: str): from . import ( # noqa: E402 debug, # noqa: F401 # type: ignore[reportUnusedImport] + export_import, # noqa: F401 # type: ignore[reportUnusedImport] oauth, # noqa: F401 # type: ignore[reportUnusedImport] setup, # noqa: F401 # type: ignore[reportUnusedImport] update, # noqa: F401 # type: ignore[reportUnusedImport] diff --git a/src/kimi_cli/utils/export.py b/src/kimi_cli/utils/export.py new file mode 100644 index 000000000..b3752c3fd --- /dev/null +++ b/src/kimi_cli/utils/export.py @@ -0,0 +1,685 @@ +from __future__ import annotations + +import json +from collections.abc import Sequence +from datetime import datetime +from pathlib import Path +from textwrap import shorten +from typing import TYPE_CHECKING, cast + +import aiofiles +from kaos.path import KaosPath +from kosong.message import Message + +from kimi_cli.soul.message import system +from kimi_cli.utils.message import message_stringify +from kimi_cli.utils.path import sanitize_cli_path +from kimi_cli.wire.types import ( + AudioURLPart, + ContentPart, + ImageURLPart, + TextPart, + ThinkPart, + ToolCall, + VideoURLPart, +) + +if TYPE_CHECKING: + from kimi_cli.soul.context import Context + +# --------------------------------------------------------------------------- +# Export helpers +# --------------------------------------------------------------------------- + +_HINT_KEYS = ("path", "file_path", "command", "query", "url", "name", "pattern") +"""Common tool-call argument keys whose values make good one-line hints.""" + + +def _is_checkpoint_message(msg: Message) -> bool: + """Check if a message is an internal checkpoint marker.""" + if msg.role != "user" or len(msg.content) != 1: + return False + part = msg.content[0] + return isinstance(part, TextPart) and part.text.strip().startswith("CHECKPOINT") + + +def _extract_tool_call_hint(args_json: str) -> str: + """Extract a brief human-readable hint from tool-call arguments. + + Looks for well-known keys (path, command, …) and falls back to the first + short string value. Returns ``""`` when nothing useful is found. + """ + try: + parsed: object = json.loads(args_json) + except (json.JSONDecodeError, TypeError): + return "" + if not isinstance(parsed, dict): + return "" + args = cast(dict[str, object], parsed) + + # Prefer well-known keys + for key in _HINT_KEYS: + val = args.get(key) + if isinstance(val, str) and val.strip(): + return shorten(val, width=60, placeholder="…") + + # Fallback: first short string value + for val in args.values(): + if isinstance(val, str) and 0 < len(val) <= 80: + return shorten(val, width=60, placeholder="…") + + return "" + + +def _format_content_part_md(part: ContentPart) -> str: + """Convert a single ContentPart to markdown text.""" + match part: + case TextPart(text=text): + return text + case ThinkPart(think=think): + if not think.strip(): + return "" + return f"
Thinking\n\n{think}\n\n
" + case ImageURLPart(): + return "[image]" + case AudioURLPart(): + return "[audio]" + case VideoURLPart(): + return "[video]" + case _: + return f"[{part.type}]" + + +def _format_tool_call_md(tool_call: ToolCall) -> str: + """Convert a ToolCall to a markdown sub-section with a readable title.""" + args_raw = tool_call.function.arguments or "{}" + hint = _extract_tool_call_hint(args_raw) + title = f"#### Tool Call: {tool_call.function.name}" + if hint: + title += f" (`{hint}`)" + + try: + args_formatted = json.dumps(json.loads(args_raw), indent=2, ensure_ascii=False) + except json.JSONDecodeError: + args_formatted = args_raw + + return f"{title}\n\n```json\n{args_formatted}\n```" + + +def _format_tool_result_md(msg: Message, tool_name: str, hint: str) -> str: + """Format a tool result message as a collapsible markdown block.""" + call_id = msg.tool_call_id or "unknown" + + # Use _format_content_part_md for consistency with the rest of the module + # (message_stringify loses ThinkPart and leaks tags) + result_parts: list[str] = [] + for part in msg.content: + text = _format_content_part_md(part) + if text.strip(): + result_parts.append(text) + result_text = "\n".join(result_parts) + + summary = f"Tool Result: {tool_name}" + if hint: + summary += f" (`{hint}`)" + + return ( + f"
{summary}\n\n" + f"\n" + f"{result_text}\n\n" + "
" + ) + + +def _group_into_turns(history: Sequence[Message]) -> list[list[Message]]: + """Group messages into logical turns, each starting at a real user message.""" + turns: list[list[Message]] = [] + current: list[Message] = [] + + for msg in history: + if _is_checkpoint_message(msg): + continue + if msg.role == "user" and current: + turns.append(current) + current = [] + current.append(msg) + + if current: + turns.append(current) + return turns + + +def _format_turn_md(messages: list[Message], turn_number: int) -> str: + """Format a logical turn as a markdown section. + + A turn typically contains: + user message -> assistant (thinking + text + tool_calls) -> tool results + -> assistant (more text + tool_calls) -> tool results -> assistant (final) + All assistant/tool messages are grouped under a single ``### Assistant`` heading. + """ + lines: list[str] = [f"## Turn {turn_number}", ""] + + # tool_call_id -> (function_name, hint) + tool_call_info: dict[str, tuple[str, str]] = {} + assistant_header_written = False + + for msg in messages: + if _is_checkpoint_message(msg): + continue + + if msg.role == "user": + lines.append("### User") + lines.append("") + for part in msg.content: + text = _format_content_part_md(part) + if text.strip(): + lines.append(text) + lines.append("") + + elif msg.role == "assistant": + if not assistant_header_written: + lines.append("### Assistant") + lines.append("") + assistant_header_written = True + + # Content parts (thinking, text, media) + for part in msg.content: + text = _format_content_part_md(part) + if text.strip(): + lines.append(text) + lines.append("") + + # Tool calls + if msg.tool_calls: + for tc in msg.tool_calls: + hint = _extract_tool_call_hint(tc.function.arguments or "{}") + tool_call_info[tc.id] = (tc.function.name, hint) + lines.append(_format_tool_call_md(tc)) + lines.append("") + + elif msg.role == "tool": + tc_id = msg.tool_call_id or "" + name, hint = tool_call_info.get(tc_id, ("unknown", "")) + lines.append(_format_tool_result_md(msg, name, hint)) + lines.append("") + + elif msg.role in ("system", "developer"): + lines.append(f"### {msg.role.capitalize()}") + lines.append("") + for part in msg.content: + text = _format_content_part_md(part) + if text.strip(): + lines.append(text) + lines.append("") + + return "\n".join(lines) + + +def _build_overview( + history: Sequence[Message], + turns: list[list[Message]], + token_count: int, +) -> str: + """Build the Overview section from existing data (no LLM call).""" + # Topic: first real user message text, truncated + topic = "" + for msg in history: + if msg.role == "user" and not _is_checkpoint_message(msg): + topic = shorten(message_stringify(msg), width=80, placeholder="…") + break + + # Count tool calls across all messages + n_tool_calls = sum(len(msg.tool_calls) for msg in history if msg.tool_calls) + + lines = [ + "## Overview", + "", + f"- **Topic**: {topic}" if topic else "- **Topic**: (empty)", + f"- **Conversation**: {len(turns)} turns | " + f"{n_tool_calls} tool calls | {token_count:,} tokens", + "", + "---", + ] + return "\n".join(lines) + + +def build_export_markdown( + session_id: str, + work_dir: str, + history: Sequence[Message], + token_count: int, + now: datetime, +) -> str: + """Build the full export markdown string.""" + lines: list[str] = [ + "---", + f"session_id: {session_id}", + f"exported_at: {now.isoformat(timespec='seconds')}", + f"work_dir: {work_dir}", + f"message_count: {len(history)}", + f"token_count: {token_count}", + "---", + "", + "# Kimi Session Export", + "", + ] + + turns = _group_into_turns(history) + lines.append(_build_overview(history, turns, token_count)) + lines.append("") + + for idx, turn_messages in enumerate(turns): + lines.append(_format_turn_md(turn_messages, idx + 1)) + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Import helpers +# --------------------------------------------------------------------------- + +_IMPORTABLE_EXTENSIONS: frozenset[str] = frozenset( + { + # Markdown / plain text + ".md", + ".markdown", + ".txt", + ".text", + ".rst", + # Data / config + ".json", + ".jsonl", + ".yaml", + ".yml", + ".toml", + ".ini", + ".cfg", + ".conf", + ".csv", + ".tsv", + ".xml", + ".env", + ".properties", + # Source code + ".py", + ".js", + ".ts", + ".jsx", + ".tsx", + ".java", + ".kt", + ".go", + ".rs", + ".c", + ".cpp", + ".h", + ".hpp", + ".cs", + ".rb", + ".php", + ".swift", + ".scala", + ".sh", + ".bash", + ".zsh", + ".fish", + ".ps1", + ".bat", + ".cmd", + ".r", + ".R", + ".lua", + ".pl", + ".pm", + ".ex", + ".exs", + ".erl", + ".hs", + ".ml", + ".sql", + ".graphql", + ".proto", + # Web + ".html", + ".htm", + ".css", + ".scss", + ".sass", + ".less", + ".svg", + # Logs + ".log", + # Documentation + ".tex", + ".bib", + ".org", + ".adoc", + ".wiki", + } +) +"""File extensions accepted by ``/import``. Only text-based formats are +supported — importing binary files (images, PDFs, archives, …) is rejected +with a friendly message.""" + + +def is_importable_file(path_str: str) -> bool: + """Return True if *path_str* has an extension in the importable whitelist. + + Files with no extension are also accepted (could be READMEs, Makefiles, …). + """ + suffix = Path(path_str).suffix.lower() + return suffix == "" or suffix in _IMPORTABLE_EXTENSIONS + + +def _stringify_content_parts(parts: Sequence[ContentPart]) -> str: + """Serialize a list of ContentParts to readable text, preserving ThinkPart.""" + segments: list[str] = [] + for part in parts: + match part: + case TextPart(text=text): + if text.strip(): + segments.append(text) + case ThinkPart(think=think): + if think.strip(): + segments.append(f"\n{think}\n") + case ImageURLPart(): + segments.append("[image]") + case AudioURLPart(): + segments.append("[audio]") + case VideoURLPart(): + segments.append("[video]") + case _: + segments.append(f"[{part.type}]") + return "\n".join(segments) + + +def _stringify_tool_calls(tool_calls: Sequence[ToolCall]) -> str: + """Serialize tool calls to readable text.""" + lines: list[str] = [] + for tc in tool_calls: + args_raw = tc.function.arguments or "{}" + try: + args = json.loads(args_raw) + args_str = json.dumps(args, ensure_ascii=False) + except (json.JSONDecodeError, TypeError): + args_str = args_raw + lines.append(f"Tool Call: {tc.function.name}({args_str})") + return "\n".join(lines) + + +def stringify_context_history(history: Sequence[Message]) -> str: + """Convert a sequence of Messages to a readable text transcript. + + Preserves ThinkPart content, tool call information, and tool results + so that an AI receiving the imported context has a complete picture. + """ + parts: list[str] = [] + for msg in history: + if _is_checkpoint_message(msg): + continue + + role_label = msg.role.upper() + segments: list[str] = [] + + # Content parts (text, thinking, media) + content_text = _stringify_content_parts(msg.content) + if content_text.strip(): + segments.append(content_text) + + # Tool calls (only on assistant messages) + if msg.tool_calls: + segments.append(_stringify_tool_calls(msg.tool_calls)) + + if not segments: + continue + + header = f"[{role_label}]" + if msg.role == "tool" and msg.tool_call_id: + header = f"[{role_label}] (call_id: {msg.tool_call_id})" + + parts.append(f"{header}\n" + "\n".join(segments)) + return "\n\n".join(parts) + + +# --------------------------------------------------------------------------- +# Shared command logic +# --------------------------------------------------------------------------- + + +async def perform_export( + history: Sequence[Message], + session_id: str, + work_dir: str, + token_count: int, + args: str, + default_dir: Path, +) -> tuple[Path, int] | str: + """Perform the full export operation. + + Returns ``(output_path, message_count)`` on success, or an error message + string on failure. + """ + if not history: + return "No messages to export." + + now = datetime.now().astimezone() + short_id = session_id[:8] + default_name = f"kimi-export-{short_id}-{now.strftime('%Y%m%d-%H%M%S')}.md" + + cleaned = sanitize_cli_path(args) + if cleaned: + # sanitize_cli_path only strips quotes; it preserves trailing separators. + directory_hint = cleaned.endswith(("/", "\\")) + output = Path(cleaned).expanduser() + if not output.is_absolute(): + output = default_dir / output + # Keep explicit "directory intent" even when the directory does not exist yet. + if directory_hint or output.is_dir(): + output = output / default_name + else: + output = default_dir / default_name + + content = build_export_markdown( + session_id=session_id, + work_dir=work_dir, + history=history, + token_count=token_count, + now=now, + ) + + try: + output.parent.mkdir(parents=True, exist_ok=True) + async with aiofiles.open(output, "w", encoding="utf-8") as f: + await f.write(content) + except OSError as e: + return f"Failed to write export file: {e}" + + return (output, len(history)) + + +MAX_IMPORT_SIZE = 10 * 1024 * 1024 # 10 MB +"""Maximum size (in bytes) of a file that can be imported via ``/import``.""" + +_SENSITIVE_FILE_PATTERNS: tuple[str, ...] = ( + ".env", + "credentials", + "secrets", + ".pem", + ".key", + ".p12", + ".pfx", + ".keystore", +) +"""File-name substrings that indicate potentially sensitive content.""" + + +def is_sensitive_file(filename: str) -> bool: + """Return True if *filename* looks like it may contain secrets.""" + name = filename.lower() + return any(pat in name for pat in _SENSITIVE_FILE_PATTERNS) + + +def _validate_import_token_budget( + estimated_tokens: int, + current_token_count: int, + max_context_size: int | None, +) -> str | None: + """Return an error if importing would push the session over the context budget. + + *estimated_tokens* is the pre-computed token estimate for the import + message. The check is ``current_token_count + estimated_tokens <= + max_context_size``. + """ + if max_context_size is None or max_context_size <= 0: + return None + + total_after_import = current_token_count + estimated_tokens + if total_after_import <= max_context_size: + return None + + return ( + "Imported content is too large for the current model context " + f"(~{estimated_tokens:,} import tokens + {current_token_count:,} existing " + f"= ~{total_after_import:,} total > {max_context_size:,} token limit). " + "Please import a smaller file or session." + ) + + +async def resolve_import_source( + target: str, + current_session_id: str, + work_dir: KaosPath, +) -> tuple[str, str] | str: + """Resolve the import source to ``(content, source_desc)`` or an error message. + + This function handles I/O and source-level validation (file type, encoding, + byte-size cap). Session-level concerns like token budget are checked by + :func:`perform_import`. + """ + from kimi_cli.session import Session + from kimi_cli.soul.context import Context + + target_path = Path(target).expanduser() + if not target_path.is_absolute(): + target_path = Path(str(work_dir)) / target_path + + if target_path.exists() and target_path.is_dir(): + return "The specified path is a directory; please provide a file to import." + + if target_path.exists() and target_path.is_file(): + if not is_importable_file(target_path.name): + return ( + f"Unsupported file type '{target_path.suffix}'. " + "/import only supports text-based files " + "(e.g. .md, .txt, .json, .py, .log, …)." + ) + + try: + file_size = target_path.stat().st_size + except OSError as e: + return f"Failed to read file: {e}" + if file_size > MAX_IMPORT_SIZE: + limit_mb = MAX_IMPORT_SIZE // (1024 * 1024) + return ( + f"File is too large ({file_size / 1024 / 1024:.1f} MB). " + f"Maximum import size is {limit_mb} MB." + ) + + try: + async with aiofiles.open(target_path, encoding="utf-8") as f: + content = await f.read() + except UnicodeDecodeError: + return ( + f"Cannot import '{target_path.name}': " + "the file does not appear to be valid UTF-8 text." + ) + except OSError as e: + return f"Failed to read file: {e}" + + if not content.strip(): + return "The file is empty, nothing to import." + + return (content, f"file '{target_path.name}'") + + # Not a file on disk — try as session ID + if target == current_session_id: + return "Cannot import the current session into itself." + + source_session = await Session.find(work_dir, target) + if source_session is None: + return f"'{target}' is not a valid file path or session ID." + + source_context = Context(source_session.context_file) + try: + restored = await source_context.restore() + except Exception as e: + return f"Failed to load source session: {e}" + if not restored or not source_context.history: + return "The source session has no messages." + + content = stringify_context_history(source_context.history) + content_bytes = len(content.encode("utf-8")) + if content_bytes > MAX_IMPORT_SIZE: + limit_mb = MAX_IMPORT_SIZE // (1024 * 1024) + actual_mb = content_bytes / 1024 / 1024 + return ( + f"Session content is too large ({actual_mb:.1f} MB). " + f"Maximum import size is {limit_mb} MB." + ) + return (content, f"session '{target}'") + + +def build_import_message(content: str, source_desc: str) -> Message: + """Build the ``Message`` to append to context for an import operation.""" + import_text = f'\n{content}\n' + return Message( + role="user", + content=[ + system( + f"The user has imported context from {source_desc}. " + "This is a prior conversation history that may be relevant " + "to the current session. " + "Please review this context and use it to inform your responses." + ), + TextPart(text=import_text), + ], + ) + + +async def perform_import( + target: str, + current_session_id: str, + work_dir: KaosPath, + context: Context, + max_context_size: int | None = None, +) -> tuple[str, int] | str: + """High-level import operation: resolve source, validate, build message, update context. + + Returns ``(source_desc, content_len)`` on success, or an error message + string. *content_len* is the raw imported content length in characters + (excluding wrapper markup), suitable for user-facing display. + The caller is responsible for any additional side-effects (wire file writes, + UI output, etc.). + """ + from kimi_cli.soul.compaction import estimate_text_tokens + + result = await resolve_import_source( + target=target, + current_session_id=current_session_id, + work_dir=work_dir, + ) + if isinstance(result, str): + return result + + content, source_desc = result + message = build_import_message(content, source_desc) + + # Token budget check — reject before mutating context. + estimated = estimate_text_tokens([message]) + if error := _validate_import_token_budget(estimated, context.token_count, max_context_size): + return error + + await context.append_message(message) + await context.update_token_count(context.token_count + estimated) + + return (source_desc, len(content)) diff --git a/src/kimi_cli/utils/path.py b/src/kimi_cli/utils/path.py index 335b1121b..0107b58b9 100644 --- a/src/kimi_cli/utils/path.py +++ b/src/kimi_cli/utils/path.py @@ -100,6 +100,19 @@ def shorten_home(path: KaosPath) -> KaosPath: return path +def sanitize_cli_path(raw: str) -> str: + """Strip surrounding quotes from a CLI path argument. + + On macOS, dragging a file into the terminal wraps the path in single + quotes (e.g. ``'/path/to/file'``). This helper strips matching outer + quotes (single or double) so downstream path handling works correctly. + """ + raw = raw.strip() + if len(raw) >= 2 and ((raw[0] == "'" and raw[-1] == "'") or (raw[0] == '"' and raw[-1] == '"')): + raw = raw[1:-1] + return raw + + def is_within_directory(path: KaosPath, directory: KaosPath) -> bool: """ Check whether *path* is contained within *directory* using pure path semantics. diff --git a/tests/core/test_soul_import_command.py b/tests/core/test_soul_import_command.py new file mode 100644 index 000000000..d9ea500f5 --- /dev/null +++ b/tests/core/test_soul_import_command.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +from pathlib import Path +from unittest.mock import AsyncMock, Mock + +from kosong.message import Message + +from kimi_cli.soul import slash as soul_slash +from kimi_cli.wire.types import TextPart + + +def _make_soul(work_dir: Path) -> Mock: + from kimi_cli.soul.kimisoul import KimiSoul + + soul = Mock(spec=KimiSoul) + soul.runtime.session.work_dir = work_dir + soul.runtime.session.id = "soul-session-id" + soul.context.history = [] + soul.context.token_count = 50 + soul.context.append_message = AsyncMock() + soul.context.update_token_count = AsyncMock() + soul.wire_file.append_message = AsyncMock() + return soul + + +async def test_import_directory_path_reports_clear_error(tmp_path: Path, monkeypatch) -> None: + captured: list[TextPart] = [] + + def fake_wire_send(message: TextPart) -> None: + captured.append(message) + + monkeypatch.setattr(soul_slash, "wire_send", fake_wire_send) + + target_dir = tmp_path / "import-dir" + target_dir.mkdir() + + soul = Mock() + await soul_slash.import_context(soul, str(target_dir)) # type: ignore[reportGeneralTypeIssues] + + assert len(captured) == 1 + assert "directory" in captured[0].text.lower() + assert "provide a file" in captured[0].text.lower() + + +async def test_export_writes_file_and_sends_wire(tmp_path: Path, monkeypatch) -> None: + captured: list[TextPart] = [] + + def fake_wire_send(message: TextPart) -> None: + captured.append(message) + + monkeypatch.setattr(soul_slash, "wire_send", fake_wire_send) + + soul = _make_soul(tmp_path) + soul.context.history = [ + Message(role="user", content=[TextPart(text="Hello")]), + Message(role="assistant", content=[TextPart(text="Hi!")]), + ] + + output = tmp_path / "export.md" + await soul_slash.export(soul, str(output)) # type: ignore[reportGeneralTypeIssues] + + assert output.exists() + content = output.read_text(encoding="utf-8") + assert "# Kimi Session Export" in content + assert "Hello" in content + + # Should send export path + sensitive info warning + assert len(captured) == 2 + assert "Exported 2 messages" in captured[0].text + assert "sensitive information" in captured[1].text.lower() + + +async def test_import_file_sends_wire_markers(tmp_path: Path, monkeypatch) -> None: + captured: list[TextPart] = [] + + def fake_wire_send(message: TextPart) -> None: + captured.append(message) + + monkeypatch.setattr(soul_slash, "wire_send", fake_wire_send) + + soul = _make_soul(tmp_path) + source = tmp_path / "context.md" + source.write_text("important context from before", encoding="utf-8") + + await soul_slash.import_context(soul, str(source)) # type: ignore[reportGeneralTypeIssues] + + # Context message appended + assert soul.context.append_message.await_count == 1 + imported_msg = soul.context.append_message.await_args.args[0] + assert imported_msg.role == "user" + + # No direct wire_file writes — KimiSoul.run() handles TurnBegin/TurnEnd + assert soul.wire_file.append_message.await_count == 0 + + # Success message sent + assert len(captured) == 1 + assert "Imported context" in captured[0].text + + +async def test_import_env_file_sends_warning(tmp_path: Path, monkeypatch) -> None: + captured: list[TextPart] = [] + + def fake_wire_send(message: TextPart) -> None: + captured.append(message) + + monkeypatch.setattr(soul_slash, "wire_send", fake_wire_send) + + soul = _make_soul(tmp_path) + env_file = tmp_path / ".env" + env_file.write_text("API_KEY=secret123", encoding="utf-8") + + await soul_slash.import_context(soul, str(env_file)) # type: ignore[reportGeneralTypeIssues] + + assert len(captured) == 2 + assert "Imported context" in captured[0].text + assert "secrets" in captured[1].text.lower() diff --git a/tests/ui_and_conv/test_export_import.py b/tests/ui_and_conv/test_export_import.py new file mode 100644 index 000000000..017fb388a --- /dev/null +++ b/tests/ui_and_conv/test_export_import.py @@ -0,0 +1,1120 @@ +"""Tests for /export and /import slash commands.""" + +from __future__ import annotations + +from datetime import datetime +from pathlib import Path + +from kosong.message import Message + +from kimi_cli.soul.message import system +from kimi_cli.utils.export import ( + _IMPORTABLE_EXTENSIONS, + _extract_tool_call_hint, + _format_content_part_md, + _format_tool_call_md, + _format_tool_result_md, + _group_into_turns, + _is_checkpoint_message, + _stringify_content_parts, + _stringify_tool_calls, + build_export_markdown, + build_import_message, + is_importable_file, + perform_export, + perform_import, + resolve_import_source, + stringify_context_history, +) +from kimi_cli.wire.types import ( + AudioURLPart, + ContentPart, + ImageURLPart, + TextPart, + ThinkPart, + ToolCall, + VideoURLPart, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_tool_call( + call_id: str = "call_001", + name: str = "bash", + arguments: str | None = '{"command": "ls"}', +) -> ToolCall: + return ToolCall( + id=call_id, + function=ToolCall.FunctionBody(name=name, arguments=arguments), + ) + + +def _make_checkpoint_message(checkpoint_id: int = 0) -> Message: + return Message( + role="user", + content=[system(f"CHECKPOINT {checkpoint_id}")], + ) + + +# --------------------------------------------------------------------------- +# _stringify_content_parts +# --------------------------------------------------------------------------- + + +class TestStringifyContentParts: + def test_text_part(self) -> None: + parts: list[ContentPart] = [TextPart(text="Hello world")] + result = _stringify_content_parts(parts) + assert result == "Hello world" + + def test_think_part_preserved(self) -> None: + parts: list[ContentPart] = [ThinkPart(think="Let me analyze this...")] + result = _stringify_content_parts(parts) + assert "" in result + assert "Let me analyze this..." in result + assert "" in result + + def test_mixed_content(self) -> None: + parts: list[ContentPart] = [ + ThinkPart(think="Thinking first"), + TextPart(text="Then responding"), + ] + result = _stringify_content_parts(parts) + assert "Thinking first" in result + assert "Then responding" in result + + def test_image_placeholder(self) -> None: + parts: list[ContentPart] = [ + ImageURLPart(image_url=ImageURLPart.ImageURL(url="https://example.com/img.png")), + ] + result = _stringify_content_parts(parts) + assert result == "[image]" + + def test_audio_placeholder(self) -> None: + parts: list[ContentPart] = [ + AudioURLPart(audio_url=AudioURLPart.AudioURL(url="https://example.com/audio.mp3")), + ] + result = _stringify_content_parts(parts) + assert result == "[audio]" + + def test_video_placeholder(self) -> None: + parts: list[ContentPart] = [ + VideoURLPart(video_url=VideoURLPart.VideoURL(url="https://example.com/video.mp4")), + ] + result = _stringify_content_parts(parts) + assert result == "[video]" + + def test_empty_text_skipped(self) -> None: + parts: list[ContentPart] = [TextPart(text=" "), TextPart(text="Real content")] + result = _stringify_content_parts(parts) + assert result == "Real content" + + def test_empty_think_skipped(self) -> None: + parts: list[ContentPart] = [ThinkPart(think=" "), TextPart(text="Response")] + result = _stringify_content_parts(parts) + assert result == "Response" + assert "" not in result + + +# --------------------------------------------------------------------------- +# _stringify_tool_calls +# --------------------------------------------------------------------------- + + +class TestStringifyToolCalls: + def test_single_tool_call(self) -> None: + tc = _make_tool_call(name="bash", arguments='{"command": "ls -la"}') + result = _stringify_tool_calls([tc]) + assert "Tool Call: bash(" in result + assert "ls -la" in result + + def test_multiple_tool_calls(self) -> None: + tc1 = _make_tool_call(call_id="c1", name="ReadFile", arguments='{"path": "a.py"}') + tc2 = _make_tool_call(call_id="c2", name="WriteFile", arguments='{"path": "b.py"}') + result = _stringify_tool_calls([tc1, tc2]) + assert "Tool Call: ReadFile(" in result + assert "Tool Call: WriteFile(" in result + assert "a.py" in result + assert "b.py" in result + + def test_invalid_json_arguments(self) -> None: + tc = _make_tool_call(name="test", arguments="not valid json") + result = _stringify_tool_calls([tc]) + assert "Tool Call: test(not valid json)" in result + + def test_none_arguments(self) -> None: + tc = _make_tool_call(name="test", arguments=None) + result = _stringify_tool_calls([tc]) + assert "Tool Call: test({})" in result + + +# --------------------------------------------------------------------------- +# stringify_context_history +# --------------------------------------------------------------------------- + + +class TestStringifyContextHistory: + def test_simple_user_assistant(self) -> None: + history: list[Message] = [ + Message(role="user", content=[TextPart(text="What is 1+1?")]), + Message(role="assistant", content=[TextPart(text="2")]), + ] + result = stringify_context_history(history) + assert "[USER]" in result + assert "What is 1+1?" in result + assert "[ASSISTANT]" in result + assert "2" in result + + def test_think_part_preserved_in_history(self) -> None: + """ThinkPart content must appear in the serialized output.""" + history: list[Message] = [ + Message(role="user", content=[TextPart(text="Explain X")]), + Message( + role="assistant", + content=[ + ThinkPart(think="Let me reason about X step by step..."), + TextPart(text="X is explained as follows..."), + ], + ), + ] + result = stringify_context_history(history) + assert "Let me reason about X step by step..." in result + assert "" in result + assert "X is explained as follows..." in result + + def test_tool_calls_preserved_in_history(self) -> None: + """Tool call information must appear in the serialized output.""" + tc = _make_tool_call(name="ReadFile", arguments='{"path": "main.py"}') + history: list[Message] = [ + Message(role="user", content=[TextPart(text="Read the file")]), + Message( + role="assistant", + content=[TextPart(text="Reading the file...")], + tool_calls=[tc], + ), + ] + result = stringify_context_history(history) + assert "Tool Call: ReadFile(" in result + assert "main.py" in result + + def test_tool_result_preserved_in_history(self) -> None: + """Tool result messages must appear with their call_id.""" + history: list[Message] = [ + Message( + role="tool", + content=[TextPart(text="file content here")], + tool_call_id="call_001", + ), + ] + result = stringify_context_history(history) + assert "[TOOL]" in result + assert "call_id: call_001" in result + assert "file content here" in result + + def test_checkpoint_messages_filtered(self) -> None: + """Checkpoint messages must not appear in the serialized output.""" + history: list[Message] = [ + Message(role="user", content=[TextPart(text="Hello")]), + _make_checkpoint_message(0), + Message(role="assistant", content=[TextPart(text="Hi there")]), + _make_checkpoint_message(1), + ] + result = stringify_context_history(history) + assert "CHECKPOINT" not in result + assert "Hello" in result + assert "Hi there" in result + + def test_full_conversation_round_trip(self) -> None: + """A complete conversation with thinking, tool calls, and results.""" + tc = _make_tool_call( + call_id="call_abc", + name="bash", + arguments='{"command": "echo hello"}', + ) + history: list[Message] = [ + Message(role="user", content=[TextPart(text="Run echo hello")]), + Message( + role="assistant", + content=[ + ThinkPart(think="User wants to run a command"), + TextPart(text="I'll run that for you."), + ], + tool_calls=[tc], + ), + Message( + role="tool", + content=[TextPart(text="hello\n")], + tool_call_id="call_abc", + ), + Message( + role="assistant", + content=[TextPart(text="The command output is: hello")], + ), + ] + result = stringify_context_history(history) + + # All key information must be present + assert "Run echo hello" in result # user message + assert "User wants to run a command" in result # thinking + assert "I'll run that for you." in result # assistant text + assert "Tool Call: bash(" in result # tool call + assert "echo hello" in result # tool args + assert "[TOOL] (call_id: call_abc)" in result # tool result header + assert "hello\n" in result # tool result content + assert "The command output is: hello" in result # final response + + def test_empty_messages_skipped(self) -> None: + """Messages with no content and no tool_calls should be skipped.""" + history: list[Message] = [ + Message(role="assistant", content=[TextPart(text="")]), + Message(role="user", content=[TextPart(text="Real message")]), + ] + result = stringify_context_history(history) + assert "[ASSISTANT]" not in result + assert "Real message" in result + + def test_system_role_preserved(self) -> None: + history: list[Message] = [ + Message(role="system", content=[TextPart(text="You are a helpful assistant")]), + ] + result = stringify_context_history(history) + assert "[SYSTEM]" in result + assert "You are a helpful assistant" in result + + +# --------------------------------------------------------------------------- +# _is_checkpoint_message +# --------------------------------------------------------------------------- + + +class TestIsCheckpointMessage: + def test_checkpoint_detected(self) -> None: + msg = _make_checkpoint_message(0) + assert _is_checkpoint_message(msg) is True + + def test_regular_user_message(self) -> None: + msg = Message(role="user", content=[TextPart(text="Hello")]) + assert _is_checkpoint_message(msg) is False + + def test_assistant_message_not_checkpoint(self) -> None: + msg = Message(role="assistant", content=[TextPart(text="CHECKPOINT 0")]) + assert _is_checkpoint_message(msg) is False + + def test_multi_part_message_not_checkpoint(self) -> None: + msg = Message( + role="user", + content=[ + TextPart(text="CHECKPOINT 0"), + TextPart(text="extra"), + ], + ) + assert _is_checkpoint_message(msg) is False + + +# --------------------------------------------------------------------------- +# _format_content_part_md (export side) +# --------------------------------------------------------------------------- + + +class TestFormatContentPartMd: + def test_text_part(self) -> None: + result = _format_content_part_md(TextPart(text="Hello world")) + assert result == "Hello world" + + def test_think_part_wrapped_in_details(self) -> None: + result = _format_content_part_md(ThinkPart(think="Reasoning here")) + assert "
Thinking" in result + assert "Reasoning here" in result + assert "
" in result + + def test_empty_think_part_returns_empty(self) -> None: + assert _format_content_part_md(ThinkPart(think="")) == "" + assert _format_content_part_md(ThinkPart(think=" ")) == "" + + def test_image_placeholder(self) -> None: + part = ImageURLPart(image_url=ImageURLPart.ImageURL(url="https://example.com/img.png")) + assert _format_content_part_md(part) == "[image]" + + def test_audio_placeholder(self) -> None: + part = AudioURLPart(audio_url=AudioURLPart.AudioURL(url="https://example.com/a.mp3")) + assert _format_content_part_md(part) == "[audio]" + + def test_video_placeholder(self) -> None: + part = VideoURLPart(video_url=VideoURLPart.VideoURL(url="https://example.com/v.mp4")) + assert _format_content_part_md(part) == "[video]" + + +# --------------------------------------------------------------------------- +# _extract_tool_call_hint +# --------------------------------------------------------------------------- + + +class TestExtractToolCallHint: + def test_known_key_path(self) -> None: + result = _extract_tool_call_hint('{"path": "/src/main.py"}') + assert result == "/src/main.py" + + def test_known_key_command(self) -> None: + result = _extract_tool_call_hint('{"command": "ls -la"}') + assert result == "ls -la" + + def test_fallback_to_first_short_string(self) -> None: + result = _extract_tool_call_hint('{"foo": "bar"}') + assert result == "bar" + + def test_empty_on_invalid_json(self) -> None: + assert _extract_tool_call_hint("not json") == "" + + def test_empty_on_non_dict(self) -> None: + assert _extract_tool_call_hint("[1, 2, 3]") == "" + + def test_empty_on_no_string_values(self) -> None: + assert _extract_tool_call_hint('{"count": 42}') == "" + + def test_long_value_truncated(self) -> None: + long_val = "a" * 100 + result = _extract_tool_call_hint(f'{{"path": "{long_val}"}}') + assert len(result) <= 60 + assert result.endswith("…") + + +# --------------------------------------------------------------------------- +# _format_tool_call_md +# --------------------------------------------------------------------------- + + +class TestFormatToolCallMd: + def test_basic_tool_call(self) -> None: + tc = _make_tool_call(call_id="c1", name="bash", arguments='{"command": "ls"}') + result = _format_tool_call_md(tc) + assert "#### Tool Call: bash" in result + assert "(`ls`)" in result # hint extracted + assert "call_id: c1" in result + assert "```json" in result + + def test_invalid_json_arguments(self) -> None: + tc = _make_tool_call(name="test", arguments="not json") + result = _format_tool_call_md(tc) + assert "#### Tool Call: test" in result + assert "not json" in result + + def test_no_hint_when_no_string_args(self) -> None: + tc = _make_tool_call(name="test", arguments='{"count": 42}') + result = _format_tool_call_md(tc) + assert "#### Tool Call: test\n" in result # no hint in parens + + +# --------------------------------------------------------------------------- +# _format_tool_result_md +# --------------------------------------------------------------------------- + + +class TestFormatToolResultMd: + def test_basic_tool_result(self) -> None: + msg = Message( + role="tool", + content=[TextPart(text="output text")], + tool_call_id="c1", + ) + result = _format_tool_result_md(msg, "bash", "ls") + assert "
Tool Result: bash (`ls`)" in result + assert "call_id: c1" in result + assert "output text" in result + assert "
" in result + + def test_system_tagged_content_preserved(self) -> None: + """Tool results with tags should still include the text.""" + msg = Message( + role="tool", + content=[system("ERROR: command failed"), TextPart(text="stderr output")], + tool_call_id="c2", + ) + result = _format_tool_result_md(msg, "bash", "") + assert "command failed" in result + assert "stderr output" in result + + def test_no_hint(self) -> None: + msg = Message( + role="tool", + content=[TextPart(text="data")], + tool_call_id="c1", + ) + result = _format_tool_result_md(msg, "ReadFile", "") + assert "Tool Result: ReadFile" in result + assert "(`" not in result + + +# --------------------------------------------------------------------------- +# _group_into_turns +# --------------------------------------------------------------------------- + + +class TestGroupIntoTurns: + def test_single_turn(self) -> None: + history = [ + Message(role="user", content=[TextPart(text="Hello")]), + Message(role="assistant", content=[TextPart(text="Hi")]), + ] + turns = _group_into_turns(history) + assert len(turns) == 1 + assert len(turns[0]) == 2 + + def test_multiple_turns(self) -> None: + history = [ + Message(role="user", content=[TextPart(text="Q1")]), + Message(role="assistant", content=[TextPart(text="A1")]), + Message(role="user", content=[TextPart(text="Q2")]), + Message(role="assistant", content=[TextPart(text="A2")]), + ] + turns = _group_into_turns(history) + assert len(turns) == 2 + + def test_checkpoints_excluded_from_turns(self) -> None: + """Checkpoint messages must be filtered out entirely during grouping.""" + history = [ + Message(role="user", content=[TextPart(text="Q1")]), + _make_checkpoint_message(0), + Message(role="assistant", content=[TextPart(text="A1")]), + ] + turns = _group_into_turns(history) + assert len(turns) == 1 + assert len(turns[0]) == 2 # user + assistant (checkpoint filtered out) + + def test_leading_checkpoints_no_empty_turn(self) -> None: + """Checkpoints before the first real user message must not produce an empty turn.""" + history = [ + _make_checkpoint_message(0), + _make_checkpoint_message(1), + Message(role="user", content=[TextPart(text="Hello")]), + Message(role="assistant", content=[TextPart(text="Hi")]), + ] + turns = _group_into_turns(history) + assert len(turns) == 1 + assert turns[0][0].role == "user" + + def test_system_messages_before_first_user(self) -> None: + """System messages before first user message form a separate initial group.""" + history = [ + Message(role="system", content=[TextPart(text="System prompt")]), + Message(role="user", content=[TextPart(text="Hello")]), + Message(role="assistant", content=[TextPart(text="Hi")]), + ] + turns = _group_into_turns(history) + assert len(turns) == 2 + # First group: system message only + assert turns[0][0].role == "system" + # Second group: user + assistant + assert turns[1][0].role == "user" + assert len(turns[1]) == 2 + + +# --------------------------------------------------------------------------- +# build_export_markdown +# --------------------------------------------------------------------------- + + +class TestBuildExportMarkdown: + def test_contains_yaml_frontmatter(self) -> None: + history = [ + Message(role="user", content=[TextPart(text="Hello")]), + Message(role="assistant", content=[TextPart(text="Hi")]), + ] + now = datetime(2026, 3, 2, 12, 0, 0) + result = build_export_markdown( + session_id="test-session", + work_dir="/tmp/work", + history=history, + token_count=1000, + now=now, + ) + assert "session_id: test-session" in result + assert "exported_at: 2026-03-02T12:00:00" in result + assert "work_dir: /tmp/work" in result + assert "message_count: 2" in result + assert "token_count: 1000" in result + + def test_contains_overview_and_turns(self) -> None: + history = [ + Message(role="user", content=[TextPart(text="What is 2+2?")]), + Message(role="assistant", content=[TextPart(text="4")]), + ] + now = datetime(2026, 1, 1) + result = build_export_markdown( + session_id="s1", + work_dir="/w", + history=history, + token_count=100, + now=now, + ) + assert "## Overview" in result + assert "## Turn 1" in result + assert "### User" in result + assert "What is 2+2?" in result + assert "### Assistant" in result + assert "4" in result + + def test_tool_calls_in_export(self) -> None: + """Full round-trip: user -> assistant with tool call -> tool result -> final.""" + tc = _make_tool_call(call_id="c1", name="bash", arguments='{"command": "echo hi"}') + history = [ + Message(role="user", content=[TextPart(text="Run echo hi")]), + Message( + role="assistant", + content=[TextPart(text="Running...")], + tool_calls=[tc], + ), + Message( + role="tool", + content=[TextPart(text="hi\n")], + tool_call_id="c1", + ), + Message( + role="assistant", + content=[TextPart(text="Done.")], + ), + ] + now = datetime(2026, 1, 1) + result = build_export_markdown( + session_id="s1", + work_dir="/w", + history=history, + token_count=500, + now=now, + ) + assert "Tool Call: bash" in result + assert "echo hi" in result + assert "Tool Result: bash" in result + assert "hi\n" in result + assert "Done." in result + + +# --------------------------------------------------------------------------- +# is_importable_file +# --------------------------------------------------------------------------- + + +class TestIsImportableFile: + def test_markdown(self) -> None: + assert is_importable_file("notes.md") is True + + def test_txt(self) -> None: + assert is_importable_file("readme.txt") is True + + def test_python(self) -> None: + assert is_importable_file("main.py") is True + + def test_json(self) -> None: + assert is_importable_file("data.json") is True + + def test_log(self) -> None: + assert is_importable_file("server.log") is True + + def test_no_extension_accepted(self) -> None: + assert is_importable_file("Makefile") is True + assert is_importable_file("README") is True + + def test_binary_rejected(self) -> None: + assert is_importable_file("photo.png") is False + assert is_importable_file("archive.zip") is False + assert is_importable_file("document.pdf") is False + assert is_importable_file("binary.exe") is False + assert is_importable_file("image.jpg") is False + + def test_case_insensitive(self) -> None: + assert is_importable_file("README.MD") is True + assert is_importable_file("config.YAML") is True + assert is_importable_file("style.CSS") is True + + def test_importable_extensions_is_frozenset(self) -> None: + assert isinstance(_IMPORTABLE_EXTENSIONS, frozenset) + + +# --------------------------------------------------------------------------- +# perform_export +# --------------------------------------------------------------------------- + +_SIMPLE_HISTORY = [ + Message(role="user", content=[TextPart(text="Hello")]), + Message(role="assistant", content=[TextPart(text="Hi!")]), +] + + +class TestPerformExport: + async def test_empty_history_returns_error(self, tmp_path: Path) -> None: + result = await perform_export( + history=[], + session_id="abc12345", + work_dir="/tmp", + token_count=0, + args="", + default_dir=tmp_path, + ) + assert result == "No messages to export." + + async def test_writes_to_specified_file(self, tmp_path: Path) -> None: + output = tmp_path / "my-export.md" + result = await perform_export( + history=_SIMPLE_HISTORY, + session_id="abc12345", + work_dir="/tmp", + token_count=100, + args=str(output), + default_dir=tmp_path, + ) + assert isinstance(result, tuple) + path, count = result + assert path == output + assert count == 2 + assert output.exists() + content = output.read_text() + assert "# Kimi Session Export" in content + assert "Hello" in content + + async def test_uses_default_dir_when_no_args(self, tmp_path: Path) -> None: + result = await perform_export( + history=_SIMPLE_HISTORY, + session_id="abc12345", + work_dir="/tmp", + token_count=100, + args="", + default_dir=tmp_path, + ) + assert isinstance(result, tuple) + path, _ = result + assert path.parent == tmp_path + assert path.name.startswith("kimi-export-abc12345") + assert path.name.endswith(".md") + + async def test_dir_arg_appends_default_name(self, tmp_path: Path) -> None: + result = await perform_export( + history=_SIMPLE_HISTORY, + session_id="abc12345", + work_dir="/tmp", + token_count=100, + args=str(tmp_path), + default_dir=tmp_path, + ) + assert isinstance(result, tuple) + path, _ = result + assert path.parent == tmp_path + assert path.name.startswith("kimi-export-abc12345") + + async def test_trailing_separator_uses_directory_semantics_when_missing( + self, tmp_path: Path + ) -> None: + export_dir = tmp_path / "exports" + result = await perform_export( + history=_SIMPLE_HISTORY, + session_id="abc12345", + work_dir="/tmp", + token_count=100, + args=f"{export_dir}/", + default_dir=tmp_path, + ) + assert isinstance(result, tuple) + path, _ = result + assert path.parent == export_dir + assert path.name.startswith("kimi-export-abc12345") + assert export_dir.exists() and export_dir.is_dir() + assert path.exists() + + async def test_creates_parent_dirs(self, tmp_path: Path) -> None: + nested = tmp_path / "a" / "b" / "export.md" + result = await perform_export( + history=_SIMPLE_HISTORY, + session_id="abc12345", + work_dir="/tmp", + token_count=100, + args=str(nested), + default_dir=tmp_path, + ) + assert isinstance(result, tuple) + assert nested.exists() + + async def test_write_error_returns_message(self, tmp_path: Path) -> None: + # Point to a path where parent cannot be created (file masquerading as dir) + blocker = tmp_path / "blocker" + blocker.write_text("x") + bad_path = blocker / "sub" / "export.md" + result = await perform_export( + history=_SIMPLE_HISTORY, + session_id="abc12345", + work_dir="/tmp", + token_count=100, + args=str(bad_path), + default_dir=tmp_path, + ) + assert isinstance(result, str) + assert "Failed to write export file" in result + + +# --------------------------------------------------------------------------- +# resolve_import_source +# --------------------------------------------------------------------------- + + +class TestResolveImportSource: + async def test_directory_returns_error(self, tmp_path: Path) -> None: + target_dir = tmp_path / "some-dir" + target_dir.mkdir() + result = await resolve_import_source(str(target_dir), "curr-id", tmp_path) # type: ignore[arg-type] + assert isinstance(result, str) + assert "directory" in result.lower() + + async def test_unsupported_file_type_returns_error(self, tmp_path: Path) -> None: + img = tmp_path / "photo.png" + img.write_bytes(b"\x89PNG") + result = await resolve_import_source(str(img), "curr-id", tmp_path) # type: ignore[arg-type] + assert isinstance(result, str) + assert "Unsupported file type" in result + + async def test_empty_file_returns_error(self, tmp_path: Path) -> None: + empty = tmp_path / "empty.md" + empty.write_text(" \n ") + result = await resolve_import_source(str(empty), "curr-id", tmp_path) # type: ignore[arg-type] + assert isinstance(result, str) + assert "empty" in result.lower() + + async def test_binary_content_returns_error(self, tmp_path: Path) -> None: + bad = tmp_path / "data.txt" + bad.write_bytes(b"\xff\xfe" + b"\x00" * 100) + result = await resolve_import_source(str(bad), "curr-id", tmp_path) # type: ignore[arg-type] + assert isinstance(result, str) + assert "UTF-8" in result + + async def test_self_import_returns_error(self, tmp_path: Path) -> None: + result = await resolve_import_source("curr-id", "curr-id", tmp_path) # type: ignore[arg-type] + assert isinstance(result, str) + assert "Cannot import the current session" in result + + async def test_nonexistent_session_returns_error(self, tmp_path: Path, monkeypatch) -> None: + from kimi_cli.session import Session + + async def fake_find(_work_dir, _target): + return None + + monkeypatch.setattr(Session, "find", fake_find) + result = await resolve_import_source("no-such-id", "curr-id", tmp_path) # type: ignore[arg-type] + assert isinstance(result, str) + assert "not a valid file path or session ID" in result + + async def test_file_too_large_returns_error(self, tmp_path: Path, monkeypatch) -> None: + import kimi_cli.utils.export as export_mod + + monkeypatch.setattr(export_mod, "MAX_IMPORT_SIZE", 10) # 10 bytes + big = tmp_path / "big.md" + big.write_text("x" * 100, encoding="utf-8") + result = await resolve_import_source(str(big), "curr-id", tmp_path) # type: ignore[arg-type] + assert isinstance(result, str) + assert "too large" in result.lower() + + async def test_session_content_too_large_returns_error( + self, tmp_path: Path, monkeypatch + ) -> None: + import kimi_cli.utils.export as export_mod + from kimi_cli.session import Session + + # Mock Session.find to return a fake session + fake_session = type("FakeSession", (), {"context_file": tmp_path / "ctx.jsonl"})() + + async def fake_find(_work_dir, _target): + return fake_session + + monkeypatch.setattr(Session, "find", fake_find) + + # Mock Context to return a large history + big_text = "x" * 200 + fake_history = [Message(role="user", content=[TextPart(text=big_text)])] + + class FakeContext: + def __init__(self, _path): + self.history = fake_history + + async def restore(self): + return True + + from kimi_cli.soul import context as context_mod + + monkeypatch.setattr(context_mod, "Context", FakeContext) + monkeypatch.setattr(export_mod, "MAX_IMPORT_SIZE", 10) # 10 bytes + + result = await resolve_import_source("other-id", "curr-id", tmp_path) # type: ignore[arg-type] + assert isinstance(result, str) + assert "too large" in result.lower() + + async def test_session_restore_failure_returns_error(self, tmp_path: Path, monkeypatch) -> None: + from kimi_cli.session import Session + from kimi_cli.soul import context as context_mod + + fake_session = type("FakeSession", (), {"context_file": tmp_path / "ctx.jsonl"})() + + async def fake_find(_work_dir, _target): + return fake_session + + monkeypatch.setattr(Session, "find", fake_find) + + class FailingContext: + def __init__(self, _path): + self.history = [] + + async def restore(self): + raise RuntimeError("corrupt context file") + + monkeypatch.setattr(context_mod, "Context", FailingContext) + + result = await resolve_import_source("other-id", "curr-id", tmp_path) # type: ignore[arg-type] + assert isinstance(result, str) + assert "Failed to load source session" in result + + async def test_successful_file_import(self, tmp_path: Path) -> None: + src = tmp_path / "context.md" + src.write_text("some important context", encoding="utf-8") + result = await resolve_import_source(str(src), "curr-id", tmp_path) # type: ignore[arg-type] + assert isinstance(result, tuple) + content, source_desc = result + assert content == "some important context" + assert "context.md" in source_desc + + +# --------------------------------------------------------------------------- +# perform_export — edge cases +# --------------------------------------------------------------------------- + + +class TestPerformExportRelativePath: + async def test_relative_path_anchored_to_default_dir(self, tmp_path: Path) -> None: + """A relative output path must resolve against default_dir, not process CWD.""" + work = tmp_path / "project" + work.mkdir() + result = await perform_export( + history=_SIMPLE_HISTORY, + session_id="abc12345", + work_dir=str(work), + token_count=100, + args="subdir/my-export.md", + default_dir=work, + ) + assert isinstance(result, tuple) + path, _ = result + assert path == work / "subdir" / "my-export.md" + assert path.exists() + + async def test_absolute_path_not_affected(self, tmp_path: Path) -> None: + """Absolute paths must not be re-anchored to default_dir.""" + work = tmp_path / "project" + work.mkdir() + abs_output = tmp_path / "elsewhere" / "out.md" + result = await perform_export( + history=_SIMPLE_HISTORY, + session_id="abc12345", + work_dir=str(work), + token_count=100, + args=str(abs_output), + default_dir=work, + ) + assert isinstance(result, tuple) + path, _ = result + assert path == abs_output + assert path.exists() + + +class TestResolveImportRelativePath: + async def test_relative_path_anchored_to_work_dir(self, tmp_path: Path) -> None: + """A relative import path must resolve against work_dir, not process CWD.""" + work = tmp_path / "project" + work.mkdir() + src = work / "notes.md" + src.write_text("important notes", encoding="utf-8") + result = await resolve_import_source("notes.md", "curr-id", work) # type: ignore[arg-type] + assert isinstance(result, tuple) + content, desc = result + assert content == "important notes" + assert "notes.md" in desc + + async def test_absolute_path_not_affected(self, tmp_path: Path) -> None: + """Absolute paths must not be re-anchored to work_dir.""" + work = tmp_path / "project" + work.mkdir() + outside = tmp_path / "other" / "data.txt" + outside.parent.mkdir(parents=True) + outside.write_text("external data", encoding="utf-8") + result = await resolve_import_source(str(outside), "curr-id", work) # type: ignore[arg-type] + assert isinstance(result, tuple) + content, _ = result + assert content == "external data" + + +class TestPerformExportEdgeCases: + async def test_checkpoint_only_history_still_exports(self, tmp_path: Path) -> None: + """History with only checkpoint messages should still export (they are filtered in turns).""" + from kimi_cli.soul.message import system as sys_msg + + history = [ + Message(role="user", content=[sys_msg("CHECKPOINT 0")]), + Message(role="user", content=[sys_msg("CHECKPOINT 1")]), + ] + result = await perform_export( + history=history, + session_id="abc12345", + work_dir="/tmp", + token_count=0, + args="", + default_dir=tmp_path, + ) + # Not empty (history has 2 messages), but turns will be empty + assert isinstance(result, tuple) + path, count = result + assert count == 2 + content = path.read_text() + assert "# Kimi Session Export" in content + + +# --------------------------------------------------------------------------- +# build_import_message +# --------------------------------------------------------------------------- + + +class TestBuildImportMessage: + def test_returns_user_message_with_expected_structure(self) -> None: + msg = build_import_message("hello world", "file 'test.md'") + assert msg.role == "user" + assert len(msg.content) == 2 + + # First part is a system hint + first = msg.content[0] + assert isinstance(first, TextPart) + assert "imported context" in first.text.lower() + + # Second part contains the wrapped content + second = msg.content[1] + assert isinstance(second, TextPart) + assert "" in second.text + assert "hello world" in second.text + assert "" in second.text + + +# --------------------------------------------------------------------------- +# perform_import +# --------------------------------------------------------------------------- + + +def _make_mock_context(token_count: int = 0): + """Create a minimal mock context for perform_import tests.""" + from unittest.mock import AsyncMock + + ctx = AsyncMock() + ctx.token_count = token_count + return ctx + + +class TestPerformImport: + async def test_file_exceeding_model_context_budget_returns_error(self, tmp_path: Path) -> None: + src = tmp_path / "context.md" + src.write_text("x" * 2000, encoding="utf-8") + ctx = _make_mock_context(token_count=0) + result = await perform_import( + str(src), + "curr-id", + tmp_path, # type: ignore[arg-type] + context=ctx, + max_context_size=128, + ) + assert isinstance(result, str) + assert "model context" in result.lower() + assert "import tokens" in result.lower() + # Context must NOT be mutated on failure. + ctx.append_message.assert_not_awaited() + ctx.update_token_count.assert_not_awaited() + + async def test_file_within_model_context_budget_succeeds(self, tmp_path: Path) -> None: + src = tmp_path / "small.md" + src.write_text("small context", encoding="utf-8") + ctx = _make_mock_context(token_count=0) + result = await perform_import( + str(src), + "curr-id", + tmp_path, # type: ignore[arg-type] + context=ctx, + max_context_size=4096, + ) + assert isinstance(result, tuple) + source_desc, content_len = result + assert source_desc == "file 'small.md'" + assert content_len == len("small context") + ctx.append_message.assert_awaited_once() + ctx.update_token_count.assert_awaited_once() + + async def test_existing_context_pushes_import_over_budget(self, tmp_path: Path) -> None: + """Import that fits alone but exceeds budget with existing context tokens.""" + src = tmp_path / "medium.md" + src.write_text("a" * 100, encoding="utf-8") + # current_token_count near the limit — should fail. + ctx = _make_mock_context(token_count=180) + result = await perform_import( + str(src), + "curr-id", + tmp_path, # type: ignore[arg-type] + context=ctx, + max_context_size=200, + ) + assert isinstance(result, str) + assert "model context" in result.lower() + assert "existing" in result.lower() + ctx.append_message.assert_not_awaited() + + async def test_session_exceeding_model_context_budget_returns_error( + self, tmp_path: Path, monkeypatch + ) -> None: + """Session import that exceeds model context budget is rejected.""" + from kimi_cli.session import Session + from kimi_cli.soul import context as context_mod + + fake_session = type("FakeSession", (), {"context_file": tmp_path / "ctx.jsonl"})() + + async def fake_find(_work_dir, _target): + return fake_session + + monkeypatch.setattr(Session, "find", fake_find) + + big_text = "x" * 2000 + fake_history = [Message(role="user", content=[TextPart(text=big_text)])] + + class FakeContext: + def __init__(self, _path): + self.history = fake_history + + async def restore(self): + return True + + monkeypatch.setattr(context_mod, "Context", FakeContext) + + ctx = _make_mock_context(token_count=0) + result = await perform_import( + "other-id", + "curr-id", + tmp_path, # type: ignore[arg-type] + context=ctx, + max_context_size=128, + ) + assert isinstance(result, str) + assert "model context" in result.lower() + ctx.append_message.assert_not_awaited() + + async def test_returns_raw_content_len(self, tmp_path: Path) -> None: + """content_len must equal the raw content length, not the wrapped message.""" + src = tmp_path / "data.txt" + raw = "hello world" + src.write_text(raw, encoding="utf-8") + ctx = _make_mock_context(token_count=0) + result = await perform_import( + str(src), + "curr-id", + tmp_path, # type: ignore[arg-type] + context=ctx, + ) + assert isinstance(result, tuple) + _desc, content_len = result + assert content_len == len(raw) diff --git a/tests/ui_and_conv/test_shell_export_import_commands.py b/tests/ui_and_conv/test_shell_export_import_commands.py new file mode 100644 index 000000000..b745fa241 --- /dev/null +++ b/tests/ui_and_conv/test_shell_export_import_commands.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +from kosong.message import Message + +from kimi_cli.session import Session +from kimi_cli.ui.shell import export_import as shell_export_import +from kimi_cli.wire.types import TextPart, TurnBegin, TurnEnd + + +def _make_shell_app(work_dir: Path) -> Mock: + from kimi_cli.soul.kimisoul import KimiSoul + + soul = Mock(spec=KimiSoul) + soul.runtime.session.work_dir = work_dir + soul.runtime.session.id = "curr-session-id" + soul.context.history = [] + soul.context.token_count = 123 + soul.context.append_message = AsyncMock() + soul.context.update_token_count = AsyncMock() + soul.wire_file.append_message = AsyncMock() + + app = Mock() + app.soul = soul + return app + + +async def test_export_writes_markdown_file(tmp_path: Path) -> None: + app = _make_shell_app(tmp_path) + app.soul.context.history = [ + Message(role="user", content=[TextPart(text="Hello")]), + Message(role="assistant", content=[TextPart(text="Hi!")]), + ] + + output = tmp_path / "session.md" + await shell_export_import.export(app, str(output)) # type: ignore[reportGeneralTypeIssues] + + assert output.exists() + content = output.read_text(encoding="utf-8") + assert "# Kimi Session Export" in content + assert "session_id: curr-session-id" in content + assert "Hello" in content + assert "Hi!" in content + + +async def test_import_from_file_appends_message_and_wire_markers(tmp_path: Path) -> None: + app = _make_shell_app(tmp_path) + source_file = tmp_path / "source.md" + source_file.write_text("previous conversation context", encoding="utf-8") + + await shell_export_import.import_context(app, str(source_file)) # type: ignore[reportGeneralTypeIssues] + + assert app.soul.context.append_message.await_count == 1 + imported_message = app.soul.context.append_message.await_args.args[0] + assert imported_message.role == "user" + + imported_text = next( + p.text + for p in imported_message.content + if isinstance(p, TextPart) and " None: + app = _make_shell_app(tmp_path) + + source_context_file = tmp_path / "source_context.jsonl" + source_message = Message( + role="user", + content=[TextPart(text="Question from old session")], + ) + source_context_file.write_text( + source_message.model_dump_json(exclude_none=True) + "\n", + encoding="utf-8", + ) + + async def fake_find(_work_dir: Path, _target: str) -> SimpleNamespace: + return SimpleNamespace(context_file=source_context_file) + + monkeypatch.setattr(Session, "find", fake_find) + + await shell_export_import.import_context(app, "old-session-id") # type: ignore[reportGeneralTypeIssues] + + assert app.soul.context.append_message.await_count == 1 + imported_message = app.soul.context.append_message.await_args.args[0] + imported_text = next( + p.text + for p in imported_message.content + if isinstance(p, TextPart) and " None: + app = _make_shell_app(tmp_path) + target_dir = tmp_path / "context-dir" + target_dir.mkdir() + + print_mock = Mock() + monkeypatch.setattr(shell_export_import.console, "print", print_mock) + + await shell_export_import.import_context(app, str(target_dir)) # type: ignore[reportGeneralTypeIssues] + + assert print_mock.called + rendered = " ".join(str(arg) for args in print_mock.call_args_list for arg in args.args) + assert "directory" in rendered.lower() + assert "provide a file" in rendered.lower() + assert app.soul.context.append_message.await_count == 0 + assert app.soul.wire_file.append_message.await_count == 0 diff --git a/tests/utils/test_utils_path.py b/tests/utils/test_utils_path.py index 034210d73..61cc2bfb1 100644 --- a/tests/utils/test_utils_path.py +++ b/tests/utils/test_utils_path.py @@ -5,7 +5,9 @@ import asyncio from pathlib import Path -from kimi_cli.utils.path import next_available_rotation +import pytest + +from kimi_cli.utils.path import next_available_rotation, sanitize_cli_path async def test_next_available_rotation_empty_dir(tmp_path): @@ -217,3 +219,37 @@ async def test_next_available_rotation_concurrent_calls(tmp_path): "events_4.log", "events_5.log", } + + +# --------------------------------------------------------------------------- +# sanitize_cli_path tests +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "raw, expected", + [ + # macOS drag-and-drop: single quotes + ("'/Users/me/file.txt'", "/Users/me/file.txt"), + # double quotes + ('"/Users/me/file.txt"', "/Users/me/file.txt"), + # leading/trailing whitespace + quotes + (" '/Users/me/file.txt' ", "/Users/me/file.txt"), + # plain path – no change + ("/Users/me/file.txt", "/Users/me/file.txt"), + # empty string + ("", ""), + # whitespace only + (" ", ""), + # single quote char – not a pair + ("'", "'"), + # mismatched quotes – no stripping + ("'/Users/me/file.txt\"", "'/Users/me/file.txt\""), + # quotes inside path – should be kept + ("/Users/it's/a path", "/Users/it's/a path"), + # path with spaces inside quotes (common macOS drag) + ("'/Users/me/my docs/file.txt'", "/Users/me/my docs/file.txt"), + ], +) +def test_sanitize_cli_path(raw: str, expected: str): + assert sanitize_cli_path(raw) == expected diff --git a/tests_e2e/test_wire_protocol.py b/tests_e2e/test_wire_protocol.py index dbf15cb39..c5560abc0 100644 --- a/tests_e2e/test_wire_protocol.py +++ b/tests_e2e/test_wire_protocol.py @@ -65,6 +65,16 @@ def test_initialize_handshake(tmp_path) -> None: "description": "Add a directory to the workspace. Usage: /add-dir . Run without args to list added dirs", "aliases": [], }, + { + "name": "export", + "description": "Export current session context to a markdown file", + "aliases": [], + }, + { + "name": "import", + "description": "Import context from a file or session ID", + "aliases": [], + }, { "name": "skill:kimi-cli-help", "description": "Answer Kimi Code CLI usage, configuration, and troubleshooting questions. Use when user asks about Kimi Code CLI installation, setup, configuration, slash commands, keyboard shortcuts, MCP integration, providers, environment variables, how something works internally, or any questions about Kimi Code CLI itself.", @@ -137,6 +147,16 @@ def test_initialize_external_tool_conflict(tmp_path) -> None: "description": "Add a directory to the workspace. Usage: /add-dir . Run without args to list added dirs", "aliases": [], }, + { + "name": "export", + "description": "Export current session context to a markdown file", + "aliases": [], + }, + { + "name": "import", + "description": "Import context from a file or session ID", + "aliases": [], + }, { "name": "skill:kimi-cli-help", "description": "Answer Kimi Code CLI usage, configuration, and troubleshooting questions. Use when user asks about Kimi Code CLI installation, setup, configuration, slash commands, keyboard shortcuts, MCP integration, providers, environment variables, how something works internally, or any questions about Kimi Code CLI itself.",