diff --git a/AGENTS.md b/AGENTS.md index 9f3617ce9c..9e40cc0049 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -19,6 +19,26 @@ pnpm dev Runs on `http://localhost:3000` by default. +## Pre-commit setup + +AstrBot uses [pre-commit](https://pre-commit.com/) hooks to automatically format and lint Python code before each commit. The hooks run `ruff check`, `ruff format`, and `pyupgrade` (see [`.pre-commit-config.yaml`](.pre-commit-config.yaml) for details). + +To set it up: + +```bash +pip install pre-commit +pre-commit install +``` + +After installation, the hooks will run automatically on `git commit`. You can also run them manually at any time: + +```bash +ruff format . +ruff check . +``` + +> **Note:** If you use VSCode, install the `Ruff` extension for real-time formatting and linting in the editor. + ## Dev environment tips 1. When modifying the WebUI, be sure to maintain componentization and clean code. Avoid duplicate code. diff --git a/astrbot/core/astr_agent_tool_exec.py b/astrbot/core/astr_agent_tool_exec.py index d668dfeec9..de5caad554 100644 --- a/astrbot/core/astr_agent_tool_exec.py +++ b/astrbot/core/astr_agent_tool_exec.py @@ -31,6 +31,9 @@ from astrbot.core.provider.entites import ProviderRequest from astrbot.core.provider.register import llm_tools from astrbot.core.tools.computer_tools import ( + CuaKeyboardTypeTool, + CuaMouseClickTool, + CuaScreenshotTool, ExecuteShellTool, FileDownloadTool, FileEditTool, @@ -186,7 +189,9 @@ def _get_runtime_computer_tools( cls, runtime: str, tool_mgr, + booter: str | None = None, ) -> dict[str, FunctionTool]: + booter = "" if booter is None else str(booter).lower() if runtime == "sandbox": shell_tool = tool_mgr.get_builtin_tool(ExecuteShellTool) python_tool = tool_mgr.get_builtin_tool(PythonTool) @@ -196,7 +201,7 @@ def _get_runtime_computer_tools( write_tool = tool_mgr.get_builtin_tool(FileWriteTool) edit_tool = tool_mgr.get_builtin_tool(FileEditTool) grep_tool = tool_mgr.get_builtin_tool(GrepTool) - return { + tools = { shell_tool.name: shell_tool, python_tool.name: python_tool, upload_tool.name: upload_tool, @@ -206,6 +211,18 @@ def _get_runtime_computer_tools( edit_tool.name: edit_tool, grep_tool.name: grep_tool, } + if booter == "cua": + screenshot_tool = tool_mgr.get_builtin_tool(CuaScreenshotTool) + mouse_click_tool = tool_mgr.get_builtin_tool(CuaMouseClickTool) + keyboard_type_tool = tool_mgr.get_builtin_tool(CuaKeyboardTypeTool) + tools.update( + { + screenshot_tool.name: screenshot_tool, + mouse_click_tool.name: mouse_click_tool, + keyboard_type_tool.name: keyboard_type_tool, + } + ) + return tools if runtime == "local": shell_tool = tool_mgr.get_builtin_tool(ExecuteShellTool) python_tool = tool_mgr.get_builtin_tool(LocalPythonTool) @@ -242,6 +259,7 @@ def _build_handoff_toolset( runtime_computer_tools = cls._get_runtime_computer_tools( runtime, tool_mgr, + provider_settings.get("sandbox", {}).get("booter"), ) # Keep persona semantics aligned with the main agent: tools=None means diff --git a/astrbot/core/astr_main_agent.py b/astrbot/core/astr_main_agent.py index 87cb2db064..b013f010e1 100644 --- a/astrbot/core/astr_main_agent.py +++ b/astrbot/core/astr_main_agent.py @@ -47,6 +47,9 @@ BrowserExecTool, CreateSkillCandidateTool, CreateSkillPayloadTool, + CuaKeyboardTypeTool, + CuaMouseClickTool, + CuaScreenshotTool, EvaluateSkillCandidateTool, ExecuteShellTool, FileDownloadTool, @@ -1015,6 +1018,22 @@ def _apply_sandbox_tools( req.func_tool.add_tool(tool_mgr.get_builtin_tool(RollbackSkillReleaseTool)) req.func_tool.add_tool(tool_mgr.get_builtin_tool(SyncSkillReleaseTool)) + if booter == "cua": + req.system_prompt += ( + "\n[CUA Desktop Control]\n" + "Use `astrbot_execute_shell` with `background=true` to launch GUI apps. " + 'Use Firefox for browser tasks, for example `firefox "https://example.com"`. ' + "After each visible step, call `astrbot_cua_screenshot` with " + "`send_to_user=true` and `return_image_to_llm=true` so the user can " + "monitor progress. When typing, inspect the screenshot first and confirm " + "the target field is focused and empty or safe to append to. Use " + "`astrbot_cua_mouse_click` for coordinates and `astrbot_cua_keyboard_type` " + "for text input; use text=`\\n` for Enter.\n" + ) + req.func_tool.add_tool(tool_mgr.get_builtin_tool(CuaScreenshotTool)) + req.func_tool.add_tool(tool_mgr.get_builtin_tool(CuaMouseClickTool)) + req.func_tool.add_tool(tool_mgr.get_builtin_tool(CuaKeyboardTypeTool)) + req.system_prompt = f"{req.system_prompt or ''}\n{SANDBOX_MODE_PROMPT}\n" diff --git a/astrbot/core/computer/booters/base.py b/astrbot/core/computer/booters/base.py index 4c74e5edd6..c39032d4bb 100644 --- a/astrbot/core/computer/booters/base.py +++ b/astrbot/core/computer/booters/base.py @@ -1,6 +1,7 @@ from ..olayer import ( BrowserComponent, FileSystemComponent, + GUIComponent, PythonComponent, ShellComponent, ) @@ -29,6 +30,10 @@ def capabilities(self) -> tuple[str, ...] | None: def browser(self) -> BrowserComponent | None: return None + @property + def gui(self) -> GUIComponent | None: + return None + async def boot(self, session_id: str) -> None: ... async def shutdown(self) -> None: ... diff --git a/astrbot/core/computer/booters/cua.py b/astrbot/core/computer/booters/cua.py new file mode 100644 index 0000000000..dd72a0aa8a --- /dev/null +++ b/astrbot/core/computer/booters/cua.py @@ -0,0 +1,830 @@ +from __future__ import annotations + +import base64 +import inspect +import shlex +from dataclasses import asdict, dataclass, is_dataclass +from pathlib import Path +from typing import Any + +from astrbot.api import logger + +from ..olayer import FileSystemComponent, GUIComponent, PythonComponent, ShellComponent +from .base import ComputerBooter +from .cua_defaults import CUA_CONFIG_KEYS, CUA_DEFAULT_CONFIG +from .shipyard_search_file_util import search_files_via_shell + +_POSIX_OS_TYPES = {"linux", "darwin", "macos"} + +_CUA_BACKGROUND_LAUNCHER = """ +import subprocess, sys, time + +p = subprocess.Popen( + ["sh", "-lc", sys.argv[1]], + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True, +) +sys.stdout.write(str(p.pid) + "\\n") +sys.stdout.flush() +time.sleep(0.2) +code = p.poll() +sys.exit(0 if code is None else code) +""".strip() + + +async def _maybe_await(value: Any) -> Any: + if inspect.isawaitable(value): + return await value + return value + + +def build_cua_booter_kwargs(sandbox_cfg: dict[str, Any]) -> dict[str, Any]: + return { + name: sandbox_cfg.get(config_key, CUA_DEFAULT_CONFIG[name]) + for name, config_key in CUA_CONFIG_KEYS.items() + } + + +async def _write_base64_via_shell( + shell: ShellComponent, + path: str, + data: bytes, +) -> dict[str, Any]: + encoded = base64.b64encode(data).decode("ascii") + decoder = ( + "import base64,pathlib,sys; " + "pathlib.Path(sys.argv[1]).write_bytes(base64.b64decode(sys.stdin.read()))" + ) + return await shell.exec( + f"python3 -c {shlex.quote(decoder)} {shlex.quote(path)} <<'EOF'\n{encoded}\nEOF" + ) + + +@dataclass(slots=True) +class ProcessResult: + stdout: str + stderr: str + exit_code: int | None + success: bool + + +def _maybe_model_dump(value: Any) -> dict[str, Any]: + if isinstance(value, dict): + return value + if is_dataclass(value) and not isinstance(value, type): + return asdict(value) + if hasattr(value, "model_dump"): + dumped = value.model_dump() + if isinstance(dumped, dict): + return dumped + if hasattr(value, "dict"): + dumped = value.dict() + if isinstance(dumped, dict): + return dumped + attr_payload = { + key: getattr(value, key) + for key in ( + "stdout", + "stderr", + "output", + "error", + "returncode", + "return_code", + "exit_code", + "success", + ) + if hasattr(value, key) + } + if attr_payload: + return attr_payload + return {} + + +def _slice_content_by_lines( + content: str, + *, + offset: int | None = None, + limit: int | None = None, +) -> str: + lines = content.splitlines(keepends=True) + start = 0 if offset is None else offset + selected = lines[start:] if limit is None else lines[start : start + limit] + return "".join(selected) + + +def _normalize_process_result(raw: Any) -> ProcessResult: + """Best-effort normalization for the process shapes returned by CUA SDKs.""" + payload = _maybe_model_dump(raw) + if not payload and isinstance(raw, str): + payload = {"stdout": raw} + + def first_text(*keys: str) -> str: + for key in keys: + value = payload.get(key) + if value is not None: + return str(value) + return "" + + stdout = first_text("stdout", "output") + stderr = first_text("stderr", "error") + exit_code = payload.get("exit_code") + if exit_code is None: + exit_code = payload.get("returncode") + if exit_code is None: + exit_code = payload.get("return_code") + if exit_code is None: + exit_code = 0 if not stderr else 1 + success = bool(payload.get("success", not stderr and exit_code in (0, None))) + return ProcessResult( + stdout=stdout, + stderr=stderr, + exit_code=exit_code, + success=success, + ) + + +def _is_missing_python3_error(stderr: str) -> bool: + lowered = stderr.lower() + return "python3" in lowered and ( + "not found" in lowered + or "command not found" in lowered + or "no such file" in lowered + ) + + +def _python3_requirement_error(operation: str, stderr: str) -> str: + return f"CUA {operation} requires python3 in the sandbox image: {stderr}" + + +def _normalize_with_python3_requirement(raw: Any, operation: str) -> ProcessResult: + proc = _normalize_process_result(raw) + if proc.stderr and _is_missing_python3_error(proc.stderr): + return ProcessResult( + stdout=proc.stdout, + stderr=_python3_requirement_error(operation, proc.stderr), + exit_code=proc.exit_code, + success=proc.success, + ) + return proc + + +async def _exec_python3_or_error( + shell: ShellComponent, + code: str, + *, + operation: str, + timeout: int | None = 30, +) -> ProcessResult: + result = await shell.exec(f"python3 - <<'PY'\n{code}\nPY", timeout=timeout) + return _normalize_with_python3_requirement(result, operation) + + +def _is_posix_os_type(os_type: str) -> bool: + return os_type.lower() in _POSIX_OS_TYPES + + +def _posix_fs_error_message(os_type: str) -> str: + return ( + "CUA filesystem shell fallback is only supported for POSIX images; " + f"os_type={os_type!r} does not support the required shell commands." + ) + + +def _non_posix_filesystem_result(path: str, os_type: str) -> dict[str, Any]: + error = _posix_fs_error_message(os_type) + return {"success": False, "path": path, "error": error, "message": error} + + +def _raise_non_posix_filesystem_error(os_type: str) -> None: + raise RuntimeError(_posix_fs_error_message(os_type)) + + +def _resolve_component_method( + component: Any, + method_names: str | tuple[str, ...], +) -> Any | None: + if component is None: + return None + names = (method_names,) if isinstance(method_names, str) else method_names + for method_name in names: + method = getattr(component, method_name, None) + if method is not None: + return method + return None + + +def _missing_component_method_error( + component_name: str, + method_names: str | tuple[str, ...], +) -> RuntimeError: + names = (method_names,) if isinstance(method_names, str) else method_names + candidates = ", ".join(f"{component_name}.{name}" for name in names) + return RuntimeError( + f"CUA sandbox does not provide any of: {candidates}. " + "Please check the installed CUA SDK version and sandbox backend." + ) + + +def _has_component_method(root: Any, component_name: str, method_name: str) -> bool: + component = getattr(root, component_name, None) + return getattr(component, method_name, None) is not None + + +class CuaShellComponent(ShellComponent): + def __init__(self, sandbox: Any, os_type: str = "linux") -> None: + self._sandbox = sandbox + self._os_type = os_type.lower() + shell = sandbox.shell + self._exec_raw = getattr(shell, "exec", None) or getattr(shell, "run", None) + if self._exec_raw is None: + raise RuntimeError("CUA sandbox shell must provide `.exec` or `.run`.") + + async def exec( + self, + command: str, + cwd: str | None = None, + env: dict[str, str] | None = None, + timeout: int | None = 30, + shell: bool = True, + background: bool = False, + ) -> dict[str, Any]: + if not shell: + return { + "stdout": "", + "stderr": "error: only shell mode is supported in CUA booter.", + "exit_code": 2, + "success": False, + } + + kwargs: dict[str, Any] = {} + if cwd is not None: + kwargs["cwd"] = cwd + if timeout is not None: + kwargs["timeout"] = timeout + if env: + kwargs["env"] = env + if background: + if not _is_posix_os_type(self._os_type): + return { + "stdout": "", + "stderr": "error: background shell execution is only supported for POSIX CUA images.", + "exit_code": 2, + "success": False, + } + command = _build_cua_background_command(command) + + result = await _maybe_await(self._exec_raw(command, **kwargs)) + proc = ( + _normalize_with_python3_requirement(result, "background execution") + if background + else _normalize_process_result(result) + ) + response = { + "stdout": proc.stdout, + "stderr": proc.stderr, + "exit_code": proc.exit_code, + "success": proc.success, + } + if background: + try: + response["pid"] = int(proc.stdout.strip().splitlines()[-1]) + except Exception: + response["pid"] = None + return response + + +def _build_cua_background_command(command: str) -> str: + return f"python3 -c {shlex.quote(_CUA_BACKGROUND_LAUNCHER)} {shlex.quote(command)}" + + +class CuaPythonComponent(PythonComponent): + def __init__(self, sandbox: Any, os_type: str = "linux") -> None: + self._sandbox = sandbox + self._os_type = os_type + python = getattr(sandbox, "python", None) + self._python_exec = None + if python is not None: + self._python_exec = getattr(python, "exec", None) or getattr( + python, "run", None + ) + + async def exec( + self, + code: str, + kernel_id: str | None = None, + timeout: int = 30, + silent: bool = False, + ) -> dict[str, Any]: + _ = kernel_id + if self._python_exec is not None: + result = await _maybe_await(self._python_exec(code, timeout=timeout)) + proc = _normalize_process_result(result) + else: + shell = CuaShellComponent(self._sandbox, os_type=self._os_type) + proc = await _exec_python3_or_error( + shell, + code, + operation="Python execution fallback", + timeout=timeout, + ) + + output_text = "" if silent else proc.stdout + error_text = proc.stderr + return { + "success": proc.success if not silent else not bool(error_text), + "data": { + "output": {"text": output_text, "images": []}, + "error": error_text, + }, + "output": output_text, + "error": error_text, + } + + +def _write_result(path: str, result: dict[str, Any]) -> dict[str, Any]: + stderr = result.get("stderr", "") + if stderr and _is_missing_python3_error(stderr): + result = { + **result, + "stderr": _python3_requirement_error("filesystem write fallback", stderr), + } + if result.get("stderr") or result.get("success") is False: + return {"success": False, "path": path, **result} + return {"success": True, "path": path, **result} + + +class CuaFileSystemComponent(FileSystemComponent): + def __init__( + self, sandbox: Any, os_type: str = CUA_DEFAULT_CONFIG["os_type"] + ) -> None: + self._shell = CuaShellComponent(sandbox, os_type=os_type) + self._fs = getattr(sandbox, "filesystem", None) + self._os_type = os_type.lower() + self._fallback = _PosixShellFileSystem(self._shell, self._os_type) + + async def create_file( + self, + path: str, + content: str = "", + mode: int = 0o644, + ) -> dict[str, Any]: + write_result = await self.write_file(path, content) + if not write_result.get("success"): + return {**write_result, "mode": mode, "mode_applied": False} + return {"success": True, "path": path, "mode": mode, "mode_applied": False} + + async def read_file( + self, + path: str, + encoding: str = "utf-8", + offset: int | None = None, + limit: int | None = None, + ) -> dict[str, Any]: + read_file = None if self._fs is None else getattr(self._fs, "read_file", None) + if read_file is None: + return await self._fallback.read_file(path, encoding, offset, limit) + else: + content = await _maybe_await(read_file(path)) + if isinstance(content, bytes): + content = content.decode(encoding, errors="replace") + return { + "success": True, + "path": path, + "content": _slice_content_by_lines( + str(content), offset=offset, limit=limit + ), + } + + async def write_file( + self, + path: str, + content: str, + mode: str = "w", + encoding: str = "utf-8", + ) -> dict[str, Any]: + _ = mode + write_file = None if self._fs is None else getattr(self._fs, "write_file", None) + if write_file is None: + return await self._fallback.write_file(path, content, mode, encoding) + else: + await _maybe_await(write_file(path, content)) + return {"success": True, "path": path} + + async def delete_file(self, path: str) -> dict[str, Any]: + delete = None + if self._fs is not None: + delete = getattr(self._fs, "delete", None) or getattr( + self._fs, "delete_file", None + ) + if delete is None: + return await self._fallback.delete_file(path) + else: + await _maybe_await(delete(path)) + return {"success": True, "path": path} + + async def list_dir( + self, + path: str = ".", + show_hidden: bool = False, + ) -> dict[str, Any]: + list_dir = None if self._fs is None else getattr(self._fs, "list_dir", None) + if list_dir is not None: + entries = await _maybe_await(list_dir(path)) + return {"success": True, "path": path, "entries": entries} + return await self._fallback.list_dir(path, show_hidden) + + async def search_files( + self, + pattern: str, + path: str | None = None, + glob: str | None = None, + after_context: int | None = None, + before_context: int | None = None, + ) -> dict[str, Any]: + return await self._fallback.search_files( + pattern=pattern, + path=path, + glob=glob, + after_context=after_context, + before_context=before_context, + ) + + async def edit_file( + self, + path: str, + old_string: str, + new_string: str, + replace_all: bool = False, + encoding: str = "utf-8", + ) -> dict[str, Any]: + read_result = await self.read_file(path, encoding=encoding) + if not read_result.get("success"): + return read_result + content = read_result.get("content", "") + occurrences = content.count(old_string) + if occurrences == 0: + return { + "success": False, + "error": "old string not found in file", + "replacements": 0, + } + updated = content.replace(old_string, new_string, -1 if replace_all else 1) + write_result = await self.write_file(path, updated, encoding=encoding) + if not write_result.get("success"): + return write_result + return { + "success": True, + "path": path, + "replacements": occurrences if replace_all else 1, + } + + +class _PosixShellFileSystem(FileSystemComponent): + def __init__(self, shell: CuaShellComponent, os_type: str) -> None: + self._shell = shell + self._os_type = os_type.lower() + + def _ensure_posix(self, path: str) -> dict[str, Any] | None: + if _is_posix_os_type(self._os_type): + return None + return _non_posix_filesystem_result(path, self._os_type) + + async def read_file( + self, + path: str, + encoding: str = "utf-8", + offset: int | None = None, + limit: int | None = None, + ) -> dict[str, Any]: + _ = encoding + if error := self._ensure_posix(path): + return error + result = await self._shell.exec(f"cat {shlex.quote(path)}") + if result.get("stderr"): + return {"success": False, "path": path, "error": result["stderr"]} + return { + "success": True, + "path": path, + "content": _slice_content_by_lines( + str(result.get("stdout", "")), offset=offset, limit=limit + ), + } + + async def write_file( + self, + path: str, + content: str, + mode: str = "w", + encoding: str = "utf-8", + ) -> dict[str, Any]: + _ = mode + if error := self._ensure_posix(path): + return error + result = await _write_base64_via_shell( + self._shell, path, content.encode(encoding) + ) + return _write_result(path, result) + + async def delete_file(self, path: str) -> dict[str, Any]: + if error := self._ensure_posix(path): + return error + result = await self._shell.exec(f"rm -rf {shlex.quote(path)}") + if result.get("stderr"): + return {"success": False, "path": path, "error": result["stderr"]} + return {"success": True, "path": path} + + async def list_dir( + self, + path: str = ".", + show_hidden: bool = False, + ) -> dict[str, Any]: + if error := self._ensure_posix(path): + return error + return await _list_dir_via_shell(self._shell, path, show_hidden) + + async def search_files( + self, + pattern: str, + path: str | None = None, + glob: str | None = None, + after_context: int | None = None, + before_context: int | None = None, + ) -> dict[str, Any]: + search_path = path or "." + if error := self._ensure_posix(search_path): + return error + return await search_files_via_shell( + self._shell, + pattern=pattern, + path=path, + glob=glob, + after_context=after_context, + before_context=before_context, + ) + + +async def _list_dir_via_shell( + shell: CuaShellComponent, + path: str, + show_hidden: bool, +) -> dict[str, Any]: + flags = "-1A" if show_hidden else "-1" + result = await shell.exec(f"ls {flags} {shlex.quote(path)}") + stdout = result.get("stdout", "") + return { + "success": not bool(result.get("stderr")), + "path": path, + "entries": [line for line in stdout.splitlines() if line.strip()], + "error": result.get("stderr", ""), + } + + +class CuaGUIComponent(GUIComponent): + def __init__(self, sandbox: Any) -> None: + self._sandbox = sandbox + mouse = getattr(sandbox, "mouse", None) + keyboard = getattr(sandbox, "keyboard", None) + self._click = _resolve_component_method(mouse, "click") + self._type_text = _resolve_component_method(keyboard, "type") + self._press_key = _resolve_component_method( + keyboard, ("press", "key_press", "press_key") + ) + + async def screenshot(self, path: str | None = None) -> dict[str, Any]: + raw = await self._sandbox.screenshot() + data = _screenshot_to_bytes(raw) + if path: + Path(path).parent.mkdir(parents=True, exist_ok=True) + Path(path).write_bytes(data) + return { + "success": True, + "path": path, + "mime_type": "image/png", + "base64": base64.b64encode(data).decode("ascii"), + } + + async def click(self, x: int, y: int, button: str = "left") -> dict[str, Any]: + if self._click is None: + raise _missing_component_method_error("mouse", "click") + result = await _maybe_await(self._click(x, y, button=button)) + payload = _maybe_model_dump(result) + return {"success": bool(payload.get("success", True)), **payload} + + async def type_text(self, text: str) -> dict[str, Any]: + if self._type_text is None: + raise _missing_component_method_error("keyboard", "type") + result = await _maybe_await(self._type_text(text)) + payload = _maybe_model_dump(result) + return {"success": bool(payload.get("success", True)), **payload} + + async def press_key(self, key: str) -> dict[str, Any]: + if self._press_key is None: + raise _missing_component_method_error( + "keyboard", ("press", "key_press", "press_key") + ) + result = await _maybe_await(self._press_key(key)) + payload = _maybe_model_dump(result) + return {"success": bool(payload.get("success", True)), **payload} + + +def _screenshot_to_bytes(raw: Any) -> bytes: + def from_str(value: str) -> bytes: + if value.startswith("data:image"): + value = value.split(",", 1)[1] + try: + return base64.b64decode(value, validate=True) + except Exception: + candidate = Path(value) + if candidate.is_file(): + return candidate.read_bytes() + return value.encode("utf-8") + + if isinstance(raw, (bytes, bytearray)): + return bytes(raw) + if isinstance(raw, str): + return from_str(raw) + if hasattr(raw, "save"): + import io + + output = io.BytesIO() + raw.save(output, format="PNG") + return output.getvalue() + payload = _maybe_model_dump(raw) + for key in ("data", "base64", "image"): + value = payload.get(key) + if value: + return _screenshot_to_bytes(value) + raise TypeError(f"Unsupported CUA screenshot result: {type(raw)!r}") + + +@dataclass(slots=True) +class _CuaRuntime: + sandbox_cm: Any + sandbox: Any + shell: CuaShellComponent + python: CuaPythonComponent + fs: CuaFileSystemComponent + gui: CuaGUIComponent | None + + +class CuaBooter(ComputerBooter): + def __init__( + self, + image: str = CUA_DEFAULT_CONFIG["image"], + os_type: str = CUA_DEFAULT_CONFIG["os_type"], + ttl: int = CUA_DEFAULT_CONFIG["ttl"], + telemetry_enabled: bool = CUA_DEFAULT_CONFIG["telemetry_enabled"], + local: bool = CUA_DEFAULT_CONFIG["local"], + api_key: str = CUA_DEFAULT_CONFIG["api_key"], + ) -> None: + self.image = image + self.os_type = os_type + self.ttl = ttl + self.telemetry_enabled = telemetry_enabled + self.local = local + self.api_key = api_key + self._runtime: _CuaRuntime | None = None + + async def boot(self, session_id: str) -> None: + _ = session_id + try: + from cua import Image, Sandbox + except ImportError as exc: + raise RuntimeError( + "CUA sandbox support requires the optional `cua` package. " + "Install it with `pip install cua` in the AstrBot environment." + ) from exc + + image_obj = self._build_image(Image) + ephemeral_kwargs = self._build_ephemeral_kwargs(Sandbox.ephemeral) + sandbox_cm = Sandbox.ephemeral(image_obj, **ephemeral_kwargs) + sandbox = await sandbox_cm.__aenter__() + try: + self._runtime = _CuaRuntime( + sandbox_cm=sandbox_cm, + sandbox=sandbox, + shell=CuaShellComponent(sandbox, os_type=self.os_type), + python=CuaPythonComponent(sandbox, os_type=self.os_type), + fs=CuaFileSystemComponent(sandbox, os_type=self.os_type), + gui=CuaGUIComponent(sandbox), + ) + except Exception: + await sandbox_cm.__aexit__(None, None, None) + self._runtime = None + raise + logger.info( + "[Computer] CUA sandbox booted: image=%s, os_type=%s", + self.image, + self.os_type, + ) + + def _build_image(self, image_cls: Any) -> Any: + image_name = (self.image or self.os_type or "linux").strip().lower() + factory = getattr(image_cls, image_name, None) + if callable(factory): + return factory() + os_factory = getattr(image_cls, (self.os_type or "linux").strip().lower(), None) + if callable(os_factory): + return os_factory() + return image_name + + def _build_ephemeral_kwargs(self, ephemeral: Any) -> dict[str, Any]: + try: + parameters = inspect.signature(ephemeral).parameters + except (TypeError, ValueError): + return {} + kwargs: dict[str, Any] = {} + if "ttl" in parameters: + kwargs["ttl"] = self.ttl + if "telemetry_enabled" in parameters: + kwargs["telemetry_enabled"] = self.telemetry_enabled + if "local" in parameters: + kwargs["local"] = self.local + if "api_key" in parameters and self.api_key: + kwargs["api_key"] = self.api_key + return kwargs + + async def shutdown(self) -> None: + if self._runtime is not None: + await self._runtime.sandbox_cm.__aexit__(None, None, None) + self._runtime = None + + @property + def capabilities(self) -> tuple[str, ...] | None: + capabilities = ["python", "shell", "filesystem"] + if self._runtime is None: + return tuple(capabilities) + + sandbox = self._runtime.sandbox + has_screenshot = getattr(sandbox, "screenshot", None) is not None + has_mouse = _has_component_method(sandbox, "mouse", "click") + has_keyboard = _has_component_method(sandbox, "keyboard", "type") + if has_screenshot or has_mouse or has_keyboard: + capabilities.append("gui") + if has_screenshot: + capabilities.append("screenshot") + if has_mouse: + capabilities.append("mouse") + if has_keyboard: + capabilities.append("keyboard") + return tuple(capabilities) + + @property + def fs(self) -> FileSystemComponent: + if self._runtime is None: + raise RuntimeError("CuaBooter is not initialized.") + return self._runtime.fs + + @property + def python(self) -> PythonComponent: + if self._runtime is None: + raise RuntimeError("CuaBooter is not initialized.") + return self._runtime.python + + @property + def shell(self) -> ShellComponent: + if self._runtime is None: + raise RuntimeError("CuaBooter is not initialized.") + return self._runtime.shell + + @property + def gui(self) -> GUIComponent | None: + return None if self._runtime is None else self._runtime.gui + + async def upload_file(self, path: str, file_name: str) -> dict: + local_path = Path(path) + if not local_path.is_file(): + return {"success": False, "error": f"File not found: {path}"} + sandbox = None if self._runtime is None else self._runtime.sandbox + if sandbox is not None and hasattr(sandbox, "upload_file"): + return _maybe_model_dump( + await sandbox.upload_file(str(local_path), file_name) + ) + if not _is_posix_os_type(self.os_type): + return _non_posix_filesystem_result(file_name, self.os_type) + result = await _write_base64_via_shell( + self.shell, file_name, local_path.read_bytes() + ) + return { + "success": not bool(result.get("stderr")), + "file_path": file_name, + **result, + } + + async def download_file(self, remote_path: str, local_path: str) -> None: + sandbox = None if self._runtime is None else self._runtime.sandbox + if sandbox is not None and hasattr(sandbox, "download_file"): + await sandbox.download_file(remote_path, local_path) + return + if not _is_posix_os_type(self.os_type): + _raise_non_posix_filesystem_error(self.os_type) + result = await self.shell.exec(f"base64 {shlex.quote(remote_path)}") + if result.get("stderr"): + raise RuntimeError(result["stderr"]) + Path(local_path).parent.mkdir(parents=True, exist_ok=True) + Path(local_path).write_bytes(base64.b64decode(result.get("stdout", ""))) + + async def available(self) -> bool: + return self._runtime is not None diff --git a/astrbot/core/computer/booters/cua_defaults.py b/astrbot/core/computer/booters/cua_defaults.py new file mode 100644 index 0000000000..4c506154ad --- /dev/null +++ b/astrbot/core/computer/booters/cua_defaults.py @@ -0,0 +1,17 @@ +CUA_DEFAULT_CONFIG = { + "image": "linux", + "os_type": "linux", + "ttl": 3600, + "telemetry_enabled": False, + "local": True, + "api_key": "", +} + +CUA_CONFIG_KEYS = { + "image": "cua_image", + "os_type": "cua_os_type", + "ttl": "cua_ttl", + "telemetry_enabled": "cua_telemetry_enabled", + "local": "cua_local", + "api_key": "cua_api_key", +} diff --git a/astrbot/core/computer/computer_client.py b/astrbot/core/computer/computer_client.py index 715f938679..3ee65ce1aa 100644 --- a/astrbot/core/computer/computer_client.py +++ b/astrbot/core/computer/computer_client.py @@ -484,6 +484,15 @@ async def get_booter( profile=profile, ttl=ttl, ) + elif booter_type == "cua": + from .booters.cua import CuaBooter, build_cua_booter_kwargs + + cua_kwargs = build_cua_booter_kwargs(sandbox_cfg) + logger.info( + f"[Computer] CUA config: image={cua_kwargs['image']}, " + f"os_type={cua_kwargs['os_type']}, ttl={cua_kwargs['ttl']}" + ) + client = CuaBooter(**cua_kwargs) elif booter_type == "boxlite": from .booters.boxlite import BoxliteBooter @@ -499,6 +508,14 @@ async def get_booter( await _sync_skills_to_sandbox(client) except Exception as e: logger.error(f"Error booting sandbox for session {session_id}: {e}") + try: + await client.shutdown() + except Exception as shutdown_error: + logger.warning( + "Failed to shutdown sandbox after boot error for session %s: %s", + session_id, + shutdown_error, + ) raise e session_booter[session_id] = client diff --git a/astrbot/core/computer/olayer/__init__.py b/astrbot/core/computer/olayer/__init__.py index e2348671eb..f446c7dde7 100644 --- a/astrbot/core/computer/olayer/__init__.py +++ b/astrbot/core/computer/olayer/__init__.py @@ -1,5 +1,6 @@ from .browser import BrowserComponent from .filesystem import FileSystemComponent +from .gui import GUIComponent from .python import PythonComponent from .shell import ShellComponent @@ -8,4 +9,5 @@ "ShellComponent", "FileSystemComponent", "BrowserComponent", + "GUIComponent", ] diff --git a/astrbot/core/computer/olayer/gui.py b/astrbot/core/computer/olayer/gui.py new file mode 100644 index 0000000000..cc23b9d7af --- /dev/null +++ b/astrbot/core/computer/olayer/gui.py @@ -0,0 +1,25 @@ +""" +GUI automation component. +""" + +from typing import Any, Protocol + + +class GUIComponent(Protocol): + """Desktop GUI operations component.""" + + async def screenshot(self, path: str | None = None) -> dict[str, Any]: + """Capture a screenshot, optionally saving it to path.""" + ... + + async def click(self, x: int, y: int, button: str = "left") -> dict[str, Any]: + """Click at screen coordinates.""" + ... + + async def type_text(self, text: str) -> dict[str, Any]: + """Type text into the active UI target.""" + ... + + async def press_key(self, key: str) -> dict[str, Any]: + """Press a keyboard key or shortcut.""" + ... diff --git a/astrbot/core/config/astrbot_config.py b/astrbot/core/config/astrbot_config.py index 77c298cac8..1dd222929c 100644 --- a/astrbot/core/config/astrbot_config.py +++ b/astrbot/core/config/astrbot_config.py @@ -103,8 +103,7 @@ def check_config_integrity(self, refer_conf: dict, conf: dict, path=""): for key, value in refer_conf.items(): if key not in conf: # 配置项不存在,插入默认值 - path_ = path + "." + key if path else key - logger.info(f"检查到配置项 {path_} 不存在,已插入默认值 {value}") + logger.info("检查到配置项不存在,已插入默认值") new_conf[key] = value has_new = True elif conf[key] is None: @@ -133,16 +132,12 @@ def check_config_integrity(self, refer_conf: dict, conf: dict, path=""): # 检查是否存在参考配置中没有的配置项 for key in list(conf.keys()): if key not in refer_conf: - path_ = path + "." + key if path else key - logger.info(f"检查到配置项 {path_} 不存在,将从当前配置中删除") + logger.info("检查到未知配置项,将从当前配置中删除") has_new = True # 顺序不一致也算作变更 if list(conf.keys()) != list(new_conf.keys()): - if path: - logger.info(f"检查到配置项 {path} 的子项顺序不一致,已重新排序") - else: - logger.info("检查到配置项顺序不一致,已重新排序") + logger.info("检查到配置项顺序不一致,已重新排序") has_new = True # 更新原始配置 diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index 9b27d482e9..64243a82f5 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -3,6 +3,7 @@ import os from typing import Any, TypedDict +from astrbot.core.computer.booters.cua_defaults import CUA_DEFAULT_CONFIG from astrbot.core.utils.astrbot_path import get_astrbot_data_path VERSION = "4.23.6" @@ -175,6 +176,12 @@ "shipyard_neo_access_token": "", "shipyard_neo_profile": "python-default", "shipyard_neo_ttl": 3600, + "cua_image": CUA_DEFAULT_CONFIG["image"], + "cua_os_type": CUA_DEFAULT_CONFIG["os_type"], + "cua_ttl": CUA_DEFAULT_CONFIG["ttl"], + "cua_telemetry_enabled": CUA_DEFAULT_CONFIG["telemetry_enabled"], + "cua_local": CUA_DEFAULT_CONFIG["local"], + "cua_api_key": CUA_DEFAULT_CONFIG["api_key"], }, "image_compress_enabled": True, "image_compress_options": { @@ -3289,8 +3296,8 @@ class ChatProviderTemplate(TypedDict): "provider_settings.sandbox.booter": { "description": "沙箱环境驱动器", "type": "string", - "options": ["shipyard_neo", "shipyard"], - "labels": ["Shipyard Neo", "Shipyard"], + "options": ["shipyard_neo", "shipyard", "cua"], + "labels": ["Shipyard Neo", "Shipyard", "CUA"], "condition": { "provider_settings.computer_use_runtime": "sandbox", }, @@ -3331,6 +3338,64 @@ class ChatProviderTemplate(TypedDict): "provider_settings.sandbox.booter": "shipyard_neo", }, }, + "provider_settings.sandbox.cua_image": { + "description": "CUA Image", + "type": "string", + "hint": "CUA 沙箱镜像/系统类型,默认 linux。可填写 linux、macos、windows、android,具体取决于 CUA SDK 支持。", + "condition": { + "provider_settings.computer_use_runtime": "sandbox", + "provider_settings.sandbox.booter": "cua", + }, + }, + "provider_settings.sandbox.cua_os_type": { + "description": "CUA OS Type", + "type": "string", + "options": ["linux", "macos", "windows", "android"], + "labels": ["Linux", "macOS", "Windows", "Android"], + "hint": "CUA 沙箱操作系统类型,默认 linux。", + "condition": { + "provider_settings.computer_use_runtime": "sandbox", + "provider_settings.sandbox.booter": "cua", + }, + }, + "provider_settings.sandbox.cua_ttl": { + "description": "CUA Sandbox TTL", + "type": "int", + "hint": "CUA 沙箱生存时间(秒)。当前作为会话配置保存,具体生效取决于 CUA SDK。", + "condition": { + "provider_settings.computer_use_runtime": "sandbox", + "provider_settings.sandbox.booter": "cua", + }, + }, + "provider_settings.sandbox.cua_telemetry_enabled": { + "description": "CUA Telemetry", + "type": "bool", + "hint": "是否允许 CUA SDK 发送遥测数据。默认关闭。", + "condition": { + "provider_settings.computer_use_runtime": "sandbox", + "provider_settings.sandbox.booter": "cua", + }, + }, + "provider_settings.sandbox.cua_local": { + "description": "CUA Local Sandbox", + "type": "bool", + "hint": "是否优先使用 CUA 本地沙箱。默认开启,避免云端沙箱要求 CUA_API_KEY。关闭后可使用 CUA 云端沙箱。", + "condition": { + "provider_settings.computer_use_runtime": "sandbox", + "provider_settings.sandbox.booter": "cua", + }, + }, + "provider_settings.sandbox.cua_api_key": { + "description": "CUA API Key", + "type": "string", + "hint": "CUA 云端沙箱 API Key。仅在关闭本地沙箱时需要。也可以通过 CUA_API_KEY 环境变量提供。", + "obvious_hint": True, + "condition": { + "provider_settings.computer_use_runtime": "sandbox", + "provider_settings.sandbox.booter": "cua", + "provider_settings.sandbox.cua_local": False, + }, + }, "provider_settings.sandbox.shipyard_endpoint": { "description": "Shipyard API Endpoint", "type": "string", diff --git a/astrbot/core/message/message_event_result.py b/astrbot/core/message/message_event_result.py index 0965fe7f7f..72dc481a23 100644 --- a/astrbot/core/message/message_event_result.py +++ b/astrbot/core/message/message_event_result.py @@ -27,9 +27,25 @@ class MessageChain: chain: list[BaseMessageComponent] = field(default_factory=list) use_t2i_: bool | None = None # None 为跟随用户设置 + use_markdown_: bool | None = ( + None # 是否使用 Markdown 发送消息。None 跟随平台默认,True 强制 Markdown,False 强制纯文本。 + ) type: str | None = None """消息链承载的消息的类型。可选,用于让消息平台区分不同业务场景的消息链。""" + def derive(self, chain: list[BaseMessageComponent] | None = None) -> "MessageChain": + """基于当前消息链创建一个新的 MessageChain,继承元数据(use_t2i_、use_markdown_ 等)。 + + Args: + chain: 新消息链的组件列表。如果为 None,则使用空列表。 + + """ + new = MessageChain(chain=chain if chain is not None else []) + new.use_t2i_ = self.use_t2i_ + new.use_markdown_ = self.use_markdown_ + new.type = self.type + return new + def message(self, message: str): """添加一条文本消息到消息链 `chain` 中。 @@ -118,6 +134,18 @@ def use_t2i(self, use_t2i: bool): self.use_t2i_ = use_t2i return self + def use_markdown(self, use: bool | None = True): + """设置是否使用 Markdown 发送消息。 + + 仅对支持 Markdown 的平台生效(如 QQ Official),不支持的平台会忽略此字段。 + + Args: + use: True 强制使用 Markdown,False 强制纯文本,None 跟随平台默认行为。 + + """ + self.use_markdown_ = use + return self + def get_plain_text(self, with_other_comps_mark: bool = False) -> str: """获取纯文本消息。这个方法将获取 chain 中所有 Plain 组件的文本并拼接成一条消息。空格分隔。 diff --git a/astrbot/core/pipeline/respond/stage.py b/astrbot/core/pipeline/respond/stage.py index aea6a74b3e..604f1ded0e 100644 --- a/astrbot/core/pipeline/respond/stage.py +++ b/astrbot/core/pipeline/respond/stage.py @@ -246,9 +246,9 @@ async def process( await asyncio.sleep(i) try: if comp.type in need_separately: - await event.send(MessageChain([comp])) + await event.send(result.derive([comp])) else: - await event.send(MessageChain([*header_comps, comp])) + await event.send(result.derive([*header_comps, comp])) header_comps.clear() except Exception as e: logger.error( @@ -271,7 +271,7 @@ async def process( modify_raw_chain=True, ) for comp in sep_comps: - chain = MessageChain([comp]) + chain = result.derive([comp]) try: await event.send(chain) except Exception as e: @@ -279,7 +279,7 @@ async def process( f"发送消息链失败: chain = {chain}, error = {e}", exc_info=True, ) - chain = MessageChain(result.chain) + chain = result.derive(result.chain) if result.chain and len(result.chain) > 0: try: await event.send(chain) diff --git a/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py b/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py index 59ef6f0564..fa10d28767 100644 --- a/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py +++ b/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py @@ -235,12 +235,20 @@ async def _post_send(self, stream: dict | None = None): ): plain_text = plain_text + "\n" - payload: dict = { - # "content": plain_text, - "markdown": MarkdownPayload(content=plain_text) if plain_text else None, - "msg_type": 2, - "msg_id": self.message_obj.message_id, - } + # 根据消息链的 use_markdown_ 标记决定发送模式 + use_md = getattr(self.send_buffer, "use_markdown_", None) + if use_md is False: + payload: dict = { + "content": plain_text, + "msg_type": 0, + "msg_id": self.message_obj.message_id, + } + else: + payload = { + "markdown": MarkdownPayload(content=plain_text) if plain_text else None, + "msg_type": 2, + "msg_id": self.message_obj.message_id, + } if not isinstance(source, botpy.message.Message | botpy.message.DirectMessage): payload["msg_seq"] = random.randint(1, 10000) diff --git a/astrbot/core/tools/computer_tools/__init__.py b/astrbot/core/tools/computer_tools/__init__.py index 7e364ffd23..f90c2e1de8 100644 --- a/astrbot/core/tools/computer_tools/__init__.py +++ b/astrbot/core/tools/computer_tools/__init__.py @@ -1,3 +1,8 @@ +from .cua import ( + CuaKeyboardTypeTool, + CuaMouseClickTool, + CuaScreenshotTool, +) from .fs import ( FileDownloadTool, FileEditTool, @@ -32,6 +37,9 @@ "BrowserExecTool", "CreateSkillCandidateTool", "CreateSkillPayloadTool", + "CuaKeyboardTypeTool", + "CuaMouseClickTool", + "CuaScreenshotTool", "EvaluateSkillCandidateTool", "ExecuteShellTool", "FileDownloadTool", diff --git a/astrbot/core/tools/computer_tools/cua.py b/astrbot/core/tools/computer_tools/cua.py new file mode 100644 index 0000000000..7b37a55086 --- /dev/null +++ b/astrbot/core/tools/computer_tools/cua.py @@ -0,0 +1,177 @@ +from __future__ import annotations + +import json +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import mcp + +from astrbot.api import FunctionTool +from astrbot.core.agent.run_context import ContextWrapper +from astrbot.core.agent.tool import ToolExecResult +from astrbot.core.astr_agent_context import AstrAgentContext +from astrbot.core.computer.computer_client import get_booter +from astrbot.core.message.message_event_result import MessageChain +from astrbot.core.tools.computer_tools.util import check_admin_permission +from astrbot.core.tools.registry import builtin_tool +from astrbot.core.utils.astrbot_path import get_astrbot_temp_path + +_CUA_TOOL_CONFIG = { + "provider_settings.computer_use_runtime": "sandbox", + "provider_settings.sandbox.booter": "cua", +} + + +def _to_json(data: Any) -> str: + return json.dumps(data, ensure_ascii=False, default=str) + + +def _exception_detail(error: Exception) -> str: + return str(error) or type(error).__name__ + + +async def _get_gui_component(context: ContextWrapper[AstrAgentContext]) -> Any: + booter = await get_booter( + context.context.context, + context.context.event.unified_msg_origin, + ) + gui = getattr(booter, "gui", None) + if gui is None: + raise RuntimeError( + "Current sandbox booter does not support CUA GUI capability. " + "Please switch sandbox booter to cua." + ) + return gui + + +@builtin_tool(config=_CUA_TOOL_CONFIG) +@dataclass +class CuaScreenshotTool(FunctionTool): + name: str = "astrbot_cua_screenshot" + description: str = ( + "Capture a screenshot from the CUA sandbox and optionally send it to the user." + ) + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "send_to_user": { + "type": "boolean", + "description": "Whether to send the screenshot image to the current conversation.", + "default": True, + }, + "return_image_to_llm": { + "type": "boolean", + "description": "Whether to include the screenshot image content in the tool result for model inspection.", + "default": True, + }, + }, + } + ) + + async def call( + self, + context: ContextWrapper[AstrAgentContext], + send_to_user: bool = True, + return_image_to_llm: bool = True, + ) -> ToolExecResult: + if err := check_admin_permission(context, "Taking CUA screenshots"): + return err + try: + gui = await _get_gui_component(context) + path = _new_screenshot_path(context.context.event.unified_msg_origin) + result = await gui.screenshot(path) + payload = {"success": True, **result, "path": path} + if send_to_user: + await context.context.event.send(MessageChain().file_image(path)) + payload["sent_to_user"] = True + image_data = payload.pop("base64", "") + content: list[mcp.types.TextContent | mcp.types.ImageContent] = [ + mcp.types.TextContent(type="text", text=_to_json(payload)) + ] + if return_image_to_llm: + content.append( + mcp.types.ImageContent( + type="image", + data=str(image_data), + mimeType=str(payload.get("mime_type", "image/png")), + ) + ) + return mcp.types.CallToolResult(content=content) + except Exception as e: + return f"Error taking CUA screenshot: {_exception_detail(e)}" + + +@builtin_tool(config=_CUA_TOOL_CONFIG) +@dataclass +class CuaMouseClickTool(FunctionTool): + name: str = "astrbot_cua_mouse_click" + description: str = "Click a coordinate in the CUA sandbox desktop." + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "x": {"type": "integer", "description": "X coordinate."}, + "y": {"type": "integer", "description": "Y coordinate."}, + "button": { + "type": "string", + "description": "Mouse button, usually left, right, or middle.", + "default": "left", + }, + }, + "required": ["x", "y"], + } + ) + + async def call( + self, + context: ContextWrapper[AstrAgentContext], + x: int, + y: int, + button: str = "left", + ) -> ToolExecResult: + if err := check_admin_permission(context, "Using CUA mouse"): + return err + try: + gui = await _get_gui_component(context) + return _to_json(await gui.click(x, y, button=button)) + except Exception as e: + return f"Error clicking CUA desktop: {_exception_detail(e)}" + + +@builtin_tool(config=_CUA_TOOL_CONFIG) +@dataclass +class CuaKeyboardTypeTool(FunctionTool): + name: str = "astrbot_cua_keyboard_type" + description: str = "Type text into the CUA sandbox desktop." + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "text": {"type": "string", "description": "Text to type."}, + }, + "required": ["text"], + } + ) + + async def call( + self, + context: ContextWrapper[AstrAgentContext], + text: str, + ) -> ToolExecResult: + if err := check_admin_permission(context, "Using CUA keyboard"): + return err + try: + gui = await _get_gui_component(context) + return _to_json(await gui.type_text(text)) + except Exception as e: + return f"Error typing in CUA desktop: {_exception_detail(e)}" + + +def _new_screenshot_path(umo: str) -> str: + safe_prefix = uuid.uuid5(uuid.NAMESPACE_DNS, umo).hex[:12] + screenshot_dir = Path(get_astrbot_temp_path()) / "cua_screenshots" + screenshot_dir.mkdir(parents=True, exist_ok=True) + return str(screenshot_dir / f"{safe_prefix}-{uuid.uuid4().hex}.png") diff --git a/astrbot/core/tools/computer_tools/shell.py b/astrbot/core/tools/computer_tools/shell.py index af933e83b1..cdefe97a0e 100644 --- a/astrbot/core/tools/computer_tools/shell.py +++ b/astrbot/core/tools/computer_tools/shell.py @@ -1,5 +1,7 @@ import json +import shlex from dataclasses import dataclass, field +from typing import Any from astrbot.api import FunctionTool from astrbot.core.agent.run_context import ContextWrapper @@ -49,7 +51,7 @@ async def call( context: ContextWrapper[AstrAgentContext], command: str, background: bool = False, - env: dict = {}, + env: dict[str, Any] | None = None, ) -> ToolExecResult: if permission_error := check_admin_permission(context, "Shell execution"): return permission_error @@ -67,12 +69,38 @@ async def call( current_workspace_root.mkdir(parents=True, exist_ok=True) cwd = str(current_workspace_root) + env = dict(env or {}) + effective_background = background and not _is_self_detached_command(command) result = await sb.shell.exec( command, cwd=cwd, - background=background, + background=effective_background, env=env, ) return json.dumps(result, ensure_ascii=False) except Exception as e: - return f"Error executing command: {str(e)}" + detail = str(e) or type(e).__name__ + return f"Error executing command: {detail}" + + +def _is_self_detached_command(command: str) -> bool: + lex = shlex.shlex(command, posix=False) + lex.whitespace_split = True + lex.commenters = "" + try: + tokens = list(lex) + except ValueError: + return False + comment_index = next( + (index for index, token in enumerate(tokens) if token.startswith("#")), + None, + ) + if comment_index is not None: + tokens = tokens[:comment_index] + if not tokens: + return False + + first = tokens[0].lower() + if first in {"nohup", "setsid", "disown", "start", "start-process"}: + return True + return tokens[-1] == "&" diff --git a/dashboard/src/components/chat/ChatInput.vue b/dashboard/src/components/chat/ChatInput.vue index ea08540ae2..c966d13777 100644 --- a/dashboard/src/components/chat/ChatInput.vue +++ b/dashboard/src/components/chat/ChatInput.vue @@ -112,6 +112,10 @@ ref="inputField" v-model="localPrompt" @keydown="handleKeyDown" + @compositionstart="handleCompositionStart" + @compositionend="handleCompositionEnd" + @compositioncancel="handleCompositionEnd" + @blur="clearCompositionState()" :disabled="disabled" placeholder="Ask AstrBot..." class="chat-textarea" @@ -307,6 +311,7 @@ import { import { useDisplay } from "vuetify"; import { useModuleI18n } from "@/i18n/composables"; import { useCustomizerStore } from "@/stores/customizer"; +import { isComposingEnter } from "@/utils/imeInput.mjs"; import ConfigSelector from "./ConfigSelector.vue"; import ProviderModelMenu from "./ProviderModelMenu.vue"; import StyledMenu from "@/components/shared/StyledMenu.vue"; @@ -379,6 +384,8 @@ const providerModelMenuRef = ref | null>( const showProviderSelector = ref(true); const isReplyClosing = ref(false); const isDragging = ref(false); +const isComposing = ref(false); +const lastCompositionEndAt = ref(null); let dragLeaveTimeout: number | null = null; const localPrompt = computed({ @@ -514,6 +521,10 @@ function handleKeyDown(e: KeyboardEvent) { return; } + if (isComposingEnter(e, isComposing.value, lastCompositionEndAt.value)) { + return; + } + const isSendHotkey = e.ctrlKey || e.metaKey || @@ -533,6 +544,23 @@ function handleKeyDown(e: KeyboardEvent) { } } +function handleCompositionStart() { + isComposing.value = true; + lastCompositionEndAt.value = null; +} + +function handleCompositionEnd(e: CompositionEvent) { + lastCompositionEndAt.value = e.timeStamp; + clearCompositionState({ keepLastEndAt: true }); +} + +function clearCompositionState({ keepLastEndAt = false } = {}) { + isComposing.value = false; + if (!keepLastEndAt) { + lastCompositionEndAt.value = null; + } +} + function handleKeyUp(e: KeyboardEvent) { if (e.keyCode === 66) { ctrlKeyDown.value = false; @@ -634,6 +662,7 @@ onBeforeUnmount(() => { if (inputField.value) { inputField.value.removeEventListener("paste", handlePaste); } + clearCompositionState(); document.removeEventListener("keyup", handleKeyUp); }); diff --git a/dashboard/src/components/extension/componentPanel/components/CommandTable.vue b/dashboard/src/components/extension/componentPanel/components/CommandTable.vue index 32eebb746b..be2ae9892d 100644 --- a/dashboard/src/components/extension/componentPanel/components/CommandTable.vue +++ b/dashboard/src/components/extension/componentPanel/components/CommandTable.vue @@ -141,7 +141,7 @@ const getRowProps = ({ item }: { item: CommandItem }) => { diff --git a/dashboard/src/components/extension/componentPanel/components/ToolTable.vue b/dashboard/src/components/extension/componentPanel/components/ToolTable.vue index 9615e5ea8c..8a42f33fef 100644 --- a/dashboard/src/components/extension/componentPanel/components/ToolTable.vue +++ b/dashboard/src/components/extension/componentPanel/components/ToolTable.vue @@ -121,7 +121,7 @@ const enabledConfigTags = (tool: ToolItem): BuiltinToolConfigTag[] => { @@ -133,7 +133,7 @@ const enabledConfigTags = (tool: ToolItem): BuiltinToolConfigTag[] => { diff --git a/dashboard/src/i18n/locales/en-US/features/config-metadata.json b/dashboard/src/i18n/locales/en-US/features/config-metadata.json index 4f35dd2859..c0796b7f07 100644 --- a/dashboard/src/i18n/locales/en-US/features/config-metadata.json +++ b/dashboard/src/i18n/locales/en-US/features/config-metadata.json @@ -186,6 +186,30 @@ "description": "Shipyard Neo Sandbox TTL", "hint": "Sandbox time-to-live in seconds." }, + "cua_image": { + "description": "CUA Image", + "hint": "CUA sandbox image or OS type. Defaults to linux. Supported values depend on the installed CUA SDK." + }, + "cua_os_type": { + "description": "CUA OS Type", + "hint": "CUA sandbox operating system type. Defaults to linux." + }, + "cua_ttl": { + "description": "CUA Sandbox TTL", + "hint": "CUA sandbox time-to-live in seconds. Actual behavior depends on the installed CUA SDK." + }, + "cua_telemetry_enabled": { + "description": "CUA Telemetry", + "hint": "Allow the CUA SDK to send telemetry data. Disabled by default." + }, + "cua_local": { + "description": "CUA Local Sandbox", + "hint": "Prefer a local CUA sandbox. Enabled by default to avoid requiring CUA_API_KEY for cloud sandboxes. Disable this to use CUA cloud sandboxes." + }, + "cua_api_key": { + "description": "CUA API Key", + "hint": "CUA cloud sandbox API key. Required only when local sandbox is disabled. You can also provide it via the CUA_API_KEY environment variable." + }, "shipyard_endpoint": { "description": "Shipyard API Endpoint", "hint": "API access address for Shipyard service." diff --git a/dashboard/src/i18n/locales/ru-RU/features/config-metadata.json b/dashboard/src/i18n/locales/ru-RU/features/config-metadata.json index 08d11aed6a..2f62db65ab 100644 --- a/dashboard/src/i18n/locales/ru-RU/features/config-metadata.json +++ b/dashboard/src/i18n/locales/ru-RU/features/config-metadata.json @@ -186,6 +186,30 @@ "description": "TTL песочницы Shipyard Neo", "hint": "Время жизни песочницы в секундах." }, + "cua_image": { + "description": "Образ CUA", + "hint": "Образ или тип ОС песочницы CUA. По умолчанию linux. Поддерживаемые значения зависят от установленного CUA SDK." + }, + "cua_os_type": { + "description": "Тип ОС CUA", + "hint": "Тип операционной системы песочницы CUA. По умолчанию linux." + }, + "cua_ttl": { + "description": "TTL песочницы CUA", + "hint": "Время жизни песочницы CUA в секундах. Фактическое поведение зависит от установленного CUA SDK." + }, + "cua_telemetry_enabled": { + "description": "Телеметрия CUA", + "hint": "Разрешить CUA SDK отправлять телеметрию. По умолчанию выключено." + }, + "cua_local": { + "description": "Локальная песочница CUA", + "hint": "Предпочитать локальную песочницу CUA. Включено по умолчанию, чтобы не требовать CUA_API_KEY для облачных песочниц. Отключите для использования облачных песочниц CUA." + }, + "cua_api_key": { + "description": "CUA API Key", + "hint": "API key для облачной песочницы CUA. Требуется только если локальная песочница отключена. Также можно передать через переменную окружения CUA_API_KEY." + }, "shipyard_endpoint": { "description": "Эндпоинт Shipyard API", "hint": "Адрес API для доступа к сервису Shipyard." diff --git a/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json b/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json index 8495f9ba1a..407e9f9f45 100644 --- a/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json +++ b/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json @@ -188,6 +188,30 @@ "description": "Shipyard Neo Sandbox 存活时间(秒)", "hint": "Shipyard Neo 沙箱的生存时间(秒)。" }, + "cua_image": { + "description": "CUA 镜像", + "hint": "CUA 沙箱镜像/系统类型,默认 linux。可填写 linux、macos、windows、android,具体取决于 CUA SDK 支持。" + }, + "cua_os_type": { + "description": "CUA 操作系统类型", + "hint": "CUA 沙箱操作系统类型,默认 linux。" + }, + "cua_ttl": { + "description": "CUA Sandbox 存活时间(秒)", + "hint": "CUA 沙箱生存时间(秒)。当前作为会话配置保存,具体生效取决于 CUA SDK。" + }, + "cua_telemetry_enabled": { + "description": "CUA 遥测", + "hint": "是否允许 CUA SDK 发送遥测数据。默认关闭。" + }, + "cua_local": { + "description": "CUA 本地沙箱", + "hint": "是否优先使用 CUA 本地沙箱。默认开启,避免云端沙箱要求 CUA_API_KEY。关闭后可使用 CUA 云端沙箱。" + }, + "cua_api_key": { + "description": "CUA API Key", + "hint": "CUA 云端沙箱 API Key。仅在关闭本地沙箱时需要。也可以通过 CUA_API_KEY 环境变量提供。" + }, "shipyard_endpoint": { "description": "Shipyard API Endpoint", "hint": "Shipyard 服务的 API 访问地址。" diff --git a/dashboard/src/utils/imeInput.mjs b/dashboard/src/utils/imeInput.mjs new file mode 100644 index 0000000000..3c24ffbae8 --- /dev/null +++ b/dashboard/src/utils/imeInput.mjs @@ -0,0 +1,31 @@ +// Some IMEs emit Enter right after compositionend; treat that same-keystroke +// window as composition so selecting a candidate does not send the message. +const RECENT_COMPOSITION_END_THRESHOLD_MS = 100; + +/** + * @param {KeyboardEvent} event + * @param {boolean} compositionActive + * @param {number | null} lastCompositionEndAt + */ +export function isComposingEnter( + event, + compositionActive, + lastCompositionEndAt = null, +) { + const hasLegacyCompositionKeyCode = + typeof event.keyCode === "number" && event.keyCode === 229; + const isAfterRecentCompositionEnd = + typeof event.timeStamp === "number" && + typeof lastCompositionEndAt === "number" && + event.timeStamp >= lastCompositionEndAt && + event.timeStamp - lastCompositionEndAt < + RECENT_COMPOSITION_END_THRESHOLD_MS; + + return ( + event.key === "Enter" && + (compositionActive || + event.isComposing || + hasLegacyCompositionKeyCode || + isAfterRecentCompositionEnd) + ); +} diff --git a/dashboard/tests/imeInput.test.mjs b/dashboard/tests/imeInput.test.mjs new file mode 100644 index 0000000000..1c5fea4731 --- /dev/null +++ b/dashboard/tests/imeInput.test.mjs @@ -0,0 +1,36 @@ +import assert from "node:assert/strict"; +import test from "node:test"; + +import { isComposingEnter } from "../src/utils/imeInput.mjs"; + +test("detects Enter while an IME composition is active", () => { + assert.equal(isComposingEnter({ key: "Enter", isComposing: true }, false), true); + assert.equal(isComposingEnter({ key: "Enter", isComposing: false }, true), true); +}); + +test("does not treat normal Enter as IME composition", () => { + assert.equal(isComposingEnter({ key: "Enter", isComposing: false }, false), false); + assert.equal(isComposingEnter({ key: "a", isComposing: true }, true), false); +}); + +test("detects Enter fired immediately after composition ended", () => { + assert.equal( + isComposingEnter( + { key: "Enter", isComposing: false, timeStamp: 105 }, + false, + 100, + ), + true, + ); +}); + +test("does not treat delayed Enter after composition ended as IME composition", () => { + assert.equal( + isComposingEnter( + { key: "Enter", isComposing: false, timeStamp: 250 }, + false, + 100, + ), + false, + ); +}); diff --git a/docs/zh/use/astrbot-agent-sandbox.md b/docs/zh/use/astrbot-agent-sandbox.md index 41a779e38b..ff59c6a7bd 100644 --- a/docs/zh/use/astrbot-agent-sandbox.md +++ b/docs/zh/use/astrbot-agent-sandbox.md @@ -13,11 +13,12 @@ - `Shipyard Neo`(当前推荐) - `Shipyard`(旧方案,仍可继续使用) +- `CUA`(本地或云端电脑使用沙盒,适合需要桌面操作的场景) 在当前版本的 AstrBot 控制台中,可在“AI 配置” -> “Agent Computer Use”中选择: - `Computer Use Runtime` = `sandbox` -- `沙箱环境驱动器` = `Shipyard Neo` 或 `Shipyard` +- `沙箱环境驱动器` = `Shipyard Neo`、`Shipyard` 或 `CUA` 其中,`Shipyard Neo` 是当前默认驱动器。它由 Bay、Ship、Gull 三部分组成: @@ -30,6 +31,109 @@ > [!TIP] > `Shipyard Neo` 下浏览器能力并不是所有 profile 都有。只有 profile 支持 `browser` capability 时,AstrBot 才会挂载浏览器相关工具。典型 profile 如 `browser-python`。 +## CUA 运行时 + +`CUA` 是一个面向电脑使用(Computer Use)的沙盒运行时。它可以通过统一的 Python SDK 创建 Linux、macOS、Windows、Android 等不同类型的沙盒,并暴露 Shell、截图、鼠标、键盘、文件系统等接口。 + +在 AstrBot 中选择 `CUA` 驱动器后,Agent 可以在 CUA sandbox 中使用: + +- Shell 工具 +- Python 工具 +- 文件读取、写入、编辑和搜索工具 +- 截图工具 +- 鼠标点击工具 +- 键盘输入工具 +- 沙盒文件上传与下载工具 + +> [!NOTE] +> CUA 是可选运行时,AstrBot 默认安装不会强制安装它。如果选择了 `CUA` 但当前 Python 环境没有安装 `cua` 包,启动沙盒时会提示安装缺失。 + +### 安装 CUA 依赖 + +如果您通过源码或虚拟环境运行 AstrBot,请在 AstrBot 使用的 Python 环境中安装 CUA: + +```bash +pip install cua +``` + +如果您使用 `uv` 管理 AstrBot 环境,可在 AstrBot 项目目录中执行: + +```bash +uv pip install cua +``` + +CUA 本身还依赖具体运行方式: + +- 本地 Linux 容器通常需要 Docker 可用。 +- 本地 Linux/Windows VM 通常需要 QEMU 或 CUA 对应的本地运行时。 +- macOS VM 通常依赖 CUA/Lume 相关运行时。 +- 云端 CUA 需要可用的 CUA API Key。 + +具体宿主机要求、镜像支持情况和本地运行时安装方式,请参考 [CUA 官方文档](https://cua.ai/docs)。 + +### 在 AstrBot 中配置 CUA + +进入 WebUI: + +- `配置 -> 普通配置 -> 使用电脑能力` + +然后设置: + +- `Computer Use Runtime` = `sandbox` +- `沙箱环境驱动器` = `CUA` + +CUA 相关配置项包括: + +- `CUA Image`:要启动的 CUA 镜像。常见值为 `linux`、`macos`、`windows`、`android`。默认 `linux`。 +- `CUA OS Type`:镜像的操作系统类型。默认 `linux`。它会影响 AstrBot 对 POSIX Shell fallback 的判断。 +- `CUA Sandbox TTL`:沙盒生命周期,单位为秒。默认 `3600`。 +- `CUA Telemetry Enabled`:是否启用 CUA 侧遥测。默认关闭。 +- `CUA Local Runtime`:是否使用本地运行时。默认开启。关闭后会按 CUA SDK 的云端方式创建沙盒。 +- `CUA API Key`:云端 CUA 所需的 API Key。仅在使用云端运行时时填写。 + +一个最小本地 Linux 容器配置通常是: + +```text +Computer Use Runtime = sandbox +沙箱环境驱动器 = CUA +CUA Image = linux +CUA OS Type = linux +CUA Local Runtime = true +CUA Sandbox TTL = 3600 +``` + +如果使用云端 CUA,可改为: + +```text +Computer Use Runtime = sandbox +沙箱环境驱动器 = CUA +CUA Image = linux +CUA OS Type = linux +CUA Local Runtime = false +CUA API Key = +``` + +> [!WARNING] +> 不要把 CUA API Key 写入公开日志、截图或 issue。AstrBot 的运行日志不会输出该字段,但部署平台、Shell 历史和容器环境变量仍需自行保护。 + +### 使用 CUA 时的注意事项 + +- `linux` 镜像通常适合 Shell、Python、文件系统和桌面自动化测试。 +- 非 POSIX 镜像(如 `windows`、`android`)不一定支持 `sh`、`cat`、`ls`、`rm`、`base64` 等命令。AstrBot 对需要这些命令的 fallback 操作会返回明确错误。 +- 如果需要在 CUA sandbox 中打开浏览器或 GUI 程序,通常应使用 Shell 后台执行,例如显式传入 `background=true`,避免命令阻塞后续工具调用。 +- 直接把 sandbox 内的文件路径发送给用户通常不可行。应优先使用 AstrBot 的沙盒下载工具,将文件下载到 AstrBot 临时目录后再发送。 +- CUA 与 Shipyard Neo 的 workspace 语义不同。Shipyard Neo 固定使用 `/workspace`;CUA 的工作目录和文件路径取决于镜像与运行时。 + +### 何时选择 CUA + +建议在以下场景选择 `CUA`: + +- 需要桌面截图、鼠标点击、键盘输入等 GUI 自动化能力。 +- 需要测试不同 OS 镜像中的行为,例如 Linux、Windows、Android。 +- 已经在本机或云端部署好 CUA 运行环境。 + +如果只是需要稳定的 Python/Shell/文件系统沙盒,且不需要桌面 GUI 操作,通常优先选择 `Shipyard Neo`。它与 AstrBot 的 workspace、Skills 同步和长期运行模式更贴合。 + ## 性能要求 AstrBot 给每个沙盒环境限制最高 1 CPU 和 512 MB 内存。 @@ -388,4 +492,4 @@ Shipyard 会自动将沙盒环境中的 /home 目录挂载到宿主机的 `${PWD ### luosheng520qaq/astrobot_plugin_code_executor -如果您资源有限,不希望使用沙盒环境来执行代码,可以尝试 luosheng520qaq 开发的 [astrobot_plugin_code_executor](https://github.com/luosheng520qaq/astrobot_plugin_code_executor) 插件。该插件会直接在宿主机上执行代码。插件已经尽力提升安全性,但仍需留意代码安全性问题。 \ No newline at end of file +如果您资源有限,不希望使用沙盒环境来执行代码,可以尝试 luosheng520qaq 开发的 [astrobot_plugin_code_executor](https://github.com/luosheng520qaq/astrobot_plugin_code_executor) 插件。该插件会直接在宿主机上执行代码。插件已经尽力提升安全性,但仍需留意代码安全性问题。 diff --git a/docs/zh/use/computer.md b/docs/zh/use/computer.md index bf0c6ecfc0..2b98420813 100644 --- a/docs/zh/use/computer.md +++ b/docs/zh/use/computer.md @@ -97,7 +97,12 @@ data/workspaces/{normalized_umo}/notes/todo.txt 在沙盒中,Agent 仍然可以使用 Shell、Python、文件系统工具;如果所选沙盒 profile 支持 `browser` capability,还会挂载浏览器自动化工具。 -使用 Shipyard Neo 时,沙盒 workspace 根目录通常是: +沙盒环境驱动器可在 `配置 -> 普通配置 -> 使用电脑能力` 的沙箱配置中选择。当前常用选项包括: + +- `Shipyard Neo`:AstrBot 推荐的远程/独立部署沙盒服务,适合长期运行和多人使用。 +- `CUA`:基于 [CUA](https://github.com/trycua/cua) 的本地或云端电脑使用沙盒,可提供桌面截图、鼠标、键盘、Shell、Python 和文件系统能力。 + +使用 `Shipyard Neo` 时,沙盒 workspace 根目录通常是: ```text /workspace @@ -115,7 +120,9 @@ result.txt /workspace/result.txt ``` -沙盒部署、profile、TTL、数据持久化、浏览器能力等内容请参考:[Agent 沙盒环境](/use/astrbot-agent-sandbox)。 +使用 `CUA` 时,工作目录和可用命令取决于所选 CUA image 与运行方式。Linux CUA 容器通常提供类 Unix Shell;Windows、Android 等非 POSIX 镜像不保证支持 `sh`、`ls`、`rm`、`base64` 等命令,AstrBot 会对部分 shell fallback 操作返回明确错误。 + +沙盒部署、驱动器选择、CUA 配置、profile、TTL、数据持久化、浏览器能力等内容请参考:[Agent 沙盒环境](/use/astrbot-agent-sandbox)。 > [!NOTE] > 即使在 `sandbox` 模式下,“需要 AstrBot 管理员权限”仍会影响 Shell、Python、浏览器、上传下载等工具的调用权限。具体权限取决于你的配置。 diff --git a/tests/unit/test_astr_main_agent.py b/tests/unit/test_astr_main_agent.py index 5a5bceae15..faae767345 100644 --- a/tests/unit/test_astr_main_agent.py +++ b/tests/unit/test_astr_main_agent.py @@ -1561,6 +1561,36 @@ def test_apply_sandbox_tools_adds_sandbox_prompt(self, mock_context): assert "sandboxed environment" in req.system_prompt + def test_apply_sandbox_tools_with_cua_adds_gui_guidance(self, mock_context): + """Test that CUA sandbox guidance nudges reliable GUI workflows.""" + module = ama + config = module.MainAgentBuildConfig( + tool_call_timeout=60, + computer_use_runtime="sandbox", + sandbox_cfg={"booter": "cua"}, + ) + req = ProviderRequest(prompt="Test", system_prompt="Original prompt") + + module._apply_sandbox_tools(config, req, "session-123") + + assert req.func_tool is not None + tool_names = req.func_tool.names() + assert "astrbot_cua_screenshot" in tool_names + assert "astrbot_cua_mouse_click" in tool_names + assert "astrbot_cua_keyboard_type" in tool_names + assert "astrbot_cua_key_press" not in tool_names + + assert "Firefox" in req.system_prompt + assert "background=true" in req.system_prompt + assert 'firefox "https://example.com"' in req.system_prompt + assert "astrbot_cua_screenshot" in req.system_prompt + assert "astrbot_cua_key_press" not in req.system_prompt + assert "return_image_to_llm" in req.system_prompt + assert "astrbot_execute_shell" in req.system_prompt + assert "\\n" in req.system_prompt + assert "send_to_user=true" in req.system_prompt + assert "focused and empty or safe to append" in req.system_prompt + def test_apply_sandbox_tools_with_shipyard_booter(self, monkeypatch, mock_context): """Test sandbox tools with shipyard booter configuration.""" module = ama diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 1da02835b1..c49e97ca0f 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -291,6 +291,27 @@ def test_nested_config_validation(self, temp_config_path): assert "level2" in config.nested["level1"] assert config.nested["level1"]["level2"]["value"] == 42 + def test_integrity_log_does_not_include_inserted_secret_value( + self, temp_config_path, monkeypatch + ): + """Default values may contain secrets and should not be logged.""" + from astrbot.core.config import astrbot_config + + existing_config = {} + default_config = {"api_key": "secret-value"} + messages = [] + with open(temp_config_path, "w", encoding="utf-8-sig") as f: + json.dump(existing_config, f) + + monkeypatch.setattr(astrbot_config.logger, "info", messages.append) + + AstrBotConfig(config_path=temp_config_path, default_config=default_config) + + assert messages + assert all("secret-value" not in message for message in messages) + assert all("api_key" not in message for message in messages) + assert any("配置项不存在" in message for message in messages) + class TestConfigHotReload: """Tests for config hot reload functionality.""" diff --git a/tests/unit/test_cua_computer_use.py b/tests/unit/test_cua_computer_use.py new file mode 100644 index 0000000000..dc8bb6aa3e --- /dev/null +++ b/tests/unit/test_cua_computer_use.py @@ -0,0 +1,1458 @@ +import asyncio +import base64 +import json +import shlex +from pathlib import Path + +import mcp +import pytest + +from astrbot.core.astr_agent_tool_exec import FunctionToolExecutor +from astrbot.core.config.default import CONFIG_METADATA_3 +from astrbot.core.provider.func_tool_manager import FunctionToolManager + + +class FakeContext: + def __init__(self, config: dict): + self._config = config + + def get_config(self, umo: str | None = None): + return self._config + + +class FakeShell: + def __init__(self): + self.commands = [] + + async def run(self, command: str, **kwargs): + self.commands.append((command, kwargs)) + return {"stdout": "ok", "stderr": "", "exit_code": 0} + + +class ProcessShapeShell: + async def run(self, command: str, **kwargs): + return {"output": "shape-ok", "returncode": 0} + + +class CommandResultShapeShell: + def __init__(self, stdout: str = "shape-ok", stderr: str = "", returncode: int = 0): + self.commands = [] + self.stdout = stdout + self.stderr = stderr + self.returncode = returncode + + @property + def success(self): + return self.returncode == 0 + + async def run(self, command: str, **kwargs): + self.commands.append((command, kwargs)) + return self + + +class FakePython: + async def run(self, code: str, **kwargs): + return {"output": "42", "error": ""} + + +class FakeFilesystem: + def __init__(self): + self.files = {} + + async def write_file(self, path: str, content: str): + self.files[path] = content + + async def read_file(self, path: str): + return self.files[path] + + async def delete(self, path: str): + self.files.pop(path, None) + + async def list_dir(self, path: str): + return [path] + + +class FakeMouse: + def __init__(self): + self.clicks = [] + + async def click(self, x: int, y: int, button: str = "left"): + self.clicks.append((x, y, button)) + return {"success": True} + + +class FakeKeyboard: + def __init__(self): + self.typed = [] + self.pressed = [] + + async def type(self, text: str): + self.typed.append(text) + return {"success": True} + + async def press(self, key: str): + self.pressed.append(key) + return {"success": True} + + +class FakeSandbox: + def __init__(self): + self.shell = FakeShell() + self.python = FakePython() + self.filesystem = FakeFilesystem() + self.mouse = FakeMouse() + self.keyboard = FakeKeyboard() + + async def screenshot(self): + return b"fake-png" + + +class SyncShell: + def __init__(self, stdout: str = "ok"): + self.commands = [] + self.stdout = stdout + + def run(self, command: str, **kwargs): + self.commands.append((command, kwargs)) + return {"stdout": self.stdout, "stderr": "", "exit_code": 0} + + +class FailingShell: + def __init__(self): + self.commands = [] + + async def run(self, command: str, **kwargs): + self.commands.append((command, kwargs)) + return { + "stdout": "", + "stderr": "python3: command not found", + "exit_code": 127, + "success": False, + } + + +class SandboxWithoutFilesystem: + def __init__(self): + self.shell = FakeShell() + self.python = FakePython() + + +class SyncPython: + def run(self, code: str, **kwargs): + return {"output": "sync", "error": ""} + + +def _agent_computer_use_items(): + return CONFIG_METADATA_3["ai_group"]["metadata"]["agent_computer_use"]["items"] + + +@pytest.mark.asyncio +async def test_get_booter_creates_cua_booter(monkeypatch): + from astrbot.core.computer import computer_client + + created = [] + + class FakeCuaBooter: + def __init__( + self, + image: str, + os_type: str, + ttl: int, + telemetry_enabled: bool, + local: bool, + api_key: str, + ): + created.append((image, os_type, ttl, telemetry_enabled, local, api_key)) + + async def boot(self, session_id: str): + self.session_id = session_id + + async def available(self): + return True + + monkeypatch.setattr( + computer_client, "_sync_skills_to_sandbox", lambda booter: asyncio.sleep(0) + ) + monkeypatch.setitem(computer_client.session_booter, "cua-test", None) + computer_client.session_booter.pop("cua-test", None) + monkeypatch.setattr( + "astrbot.core.computer.booters.cua.CuaBooter", + FakeCuaBooter, + raising=False, + ) + + ctx = FakeContext( + { + "provider_settings": { + "computer_use_runtime": "sandbox", + "sandbox": { + "booter": "cua", + "cua_image": "linux", + "cua_os_type": "linux", + "cua_ttl": 120, + "cua_telemetry_enabled": False, + "cua_local": True, + "cua_api_key": "", + }, + } + } + ) + + booter = await computer_client.get_booter(ctx, "cua-test") + + assert isinstance(booter, FakeCuaBooter) + assert created == [("linux", "linux", 120, False, True, "")] + + +def test_cua_ephemeral_kwargs_include_local_when_supported(): + from astrbot.core.computer.booters.cua import CuaBooter + + def ephemeral(image, ttl=None, telemetry_enabled=None, local=None): + return image, ttl, telemetry_enabled, local + + kwargs = CuaBooter( + ttl=120, telemetry_enabled=False, local=True + )._build_ephemeral_kwargs(ephemeral) + + assert kwargs == {"ttl": 120, "telemetry_enabled": False, "local": True} + + +def test_cua_ephemeral_kwargs_include_api_key_for_cloud_when_supported(): + from astrbot.core.computer.booters.cua import CuaBooter + + def ephemeral(image, local=None, api_key=None): + return image, local, api_key + + kwargs = CuaBooter(local=False, api_key="sk-test")._build_ephemeral_kwargs( + ephemeral + ) + + assert kwargs == {"local": False, "api_key": "sk-test"} + + +def test_cua_default_config_matches_booter_defaults(): + from astrbot.core.computer.booters.cua import CUA_DEFAULT_CONFIG, CuaBooter + from astrbot.core.config.default import DEFAULT_CONFIG + + booter = CuaBooter() + sandbox_defaults = DEFAULT_CONFIG["provider_settings"]["sandbox"] + + assert booter.image == CUA_DEFAULT_CONFIG["image"] + assert booter.os_type == CUA_DEFAULT_CONFIG["os_type"] + assert booter.ttl == CUA_DEFAULT_CONFIG["ttl"] + assert booter.telemetry_enabled == CUA_DEFAULT_CONFIG["telemetry_enabled"] + assert booter.local == CUA_DEFAULT_CONFIG["local"] + assert booter.api_key == CUA_DEFAULT_CONFIG["api_key"] + assert sandbox_defaults["cua_image"] == CUA_DEFAULT_CONFIG["image"] + assert sandbox_defaults["cua_os_type"] == CUA_DEFAULT_CONFIG["os_type"] + assert sandbox_defaults["cua_ttl"] == CUA_DEFAULT_CONFIG["ttl"] + assert ( + sandbox_defaults["cua_telemetry_enabled"] + == CUA_DEFAULT_CONFIG["telemetry_enabled"] + ) + assert sandbox_defaults["cua_local"] == CUA_DEFAULT_CONFIG["local"] + assert sandbox_defaults["cua_api_key"] == CUA_DEFAULT_CONFIG["api_key"] + + +@pytest.mark.asyncio +async def test_cua_config_log_does_not_include_api_key(monkeypatch): + from astrbot.core.computer import computer_client + + log_messages = [] + + class FakeCuaBooter: + def __init__(self, **kwargs): + self.kwargs = kwargs + + async def boot(self, session_id: str): + self.session_id = session_id + + async def available(self): + return True + + monkeypatch.setattr( + computer_client, "_sync_skills_to_sandbox", lambda booter: asyncio.sleep(0) + ) + monkeypatch.setitem(computer_client.session_booter, "cua-log-test", None) + computer_client.session_booter.pop("cua-log-test", None) + monkeypatch.setattr( + "astrbot.core.computer.booters.cua.CuaBooter", + FakeCuaBooter, + raising=False, + ) + monkeypatch.setattr(computer_client.logger, "info", log_messages.append) + + ctx = FakeContext( + { + "provider_settings": { + "computer_use_runtime": "sandbox", + "sandbox": { + "booter": "cua", + "cua_local": False, + "cua_api_key": "sk-secret-value", + }, + } + } + ) + + await computer_client.get_booter(ctx, "cua-log-test") + + assert log_messages + assert all("sk-secret-value" not in message for message in log_messages) + assert all("api_key" not in message for message in log_messages) + + +@pytest.mark.asyncio +async def test_get_booter_shuts_down_client_when_skill_sync_fails(monkeypatch): + from astrbot.core.computer import computer_client + + shutdowns = [] + + class FakeCuaBooter: + def __init__(self, **kwargs): + self.kwargs = kwargs + + async def boot(self, session_id: str): + self.session_id = session_id + + async def shutdown(self): + shutdowns.append(self.session_id) + + async def fail_sync(booter): + raise RuntimeError("sync failed") + + monkeypatch.setattr(computer_client, "_sync_skills_to_sandbox", fail_sync) + monkeypatch.setitem(computer_client.session_booter, "cua-sync-fail", None) + computer_client.session_booter.pop("cua-sync-fail", None) + monkeypatch.setattr( + "astrbot.core.computer.booters.cua.CuaBooter", + FakeCuaBooter, + raising=False, + ) + + ctx = FakeContext( + { + "provider_settings": { + "computer_use_runtime": "sandbox", + "sandbox": {"booter": "cua"}, + } + } + ) + + with pytest.raises(RuntimeError, match="sync failed"): + await computer_client.get_booter(ctx, "cua-sync-fail") + + assert len(shutdowns) == 1 + assert "cua-sync-fail" not in computer_client.session_booter + + +@pytest.mark.asyncio +async def test_cua_components_map_sdk_results(tmp_path): + from astrbot.core.computer.booters.cua import ( + CuaFileSystemComponent, + CuaGUIComponent, + CuaPythonComponent, + CuaShellComponent, + ) + + sandbox = FakeSandbox() + + shell_result = await CuaShellComponent(sandbox).exec("echo ok", cwd="/workspace") + python_result = await CuaPythonComponent(sandbox).exec("print(42)") + fs = CuaFileSystemComponent(sandbox) + await fs.write_file("hello.txt", "hello") + read_result = await fs.read_file("hello.txt") + screenshot_path = tmp_path / "screen.png" + gui = CuaGUIComponent(sandbox) + screenshot_result = await gui.screenshot(str(screenshot_path)) + click_result = await gui.click(10, 20, button="right") + type_result = await gui.type_text("hello") + press_result = await gui.press_key("Enter") + + assert shell_result["stdout"] == "ok" + assert python_result["data"]["output"]["text"] == "42" + assert read_result["content"] == "hello" + assert screenshot_path.read_bytes() == b"fake-png" + assert screenshot_result["mime_type"] == "image/png" + assert click_result["success"] is True + assert type_result["success"] is True + assert press_result["success"] is True + assert sandbox.mouse.clicks == [(10, 20, "right")] + assert sandbox.keyboard.typed == ["hello"] + assert sandbox.keyboard.pressed == ["Enter"] + + +@pytest.mark.asyncio +async def test_cua_list_dir_returns_entries_list_for_shell_fallback(): + from astrbot.core.computer.booters.cua import CuaFileSystemComponent + + sandbox = FakeSandbox() + delattr(sandbox, "filesystem") + + result = await CuaFileSystemComponent(sandbox).list_dir(".") + + assert result["success"] is True + assert result["entries"] == ["ok"] + assert sandbox.shell.commands[0][0] == "ls -1 ." + + +@pytest.mark.asyncio +async def test_cua_shell_filesystem_fallback_shell_quotes_paths(): + from astrbot.core.computer.booters.cua import CuaFileSystemComponent + + path = "folder/it's file.txt" + sandbox = FakeSandbox() + delattr(sandbox, "filesystem") + fs = CuaFileSystemComponent(sandbox) + + await fs.read_file(path) + await fs.delete_file(path) + await fs.list_dir(path) + + assert sandbox.shell.commands[0][0] == f"cat {shlex.quote(path)}" + assert sandbox.shell.commands[1][0] == f"rm -rf {shlex.quote(path)}" + assert sandbox.shell.commands[2][0] == f"ls -1 {shlex.quote(path)}" + + +@pytest.mark.asyncio +async def test_cua_write_file_shell_fallback_uses_python_base64_decoder(): + from astrbot.core.computer.booters.cua import CuaFileSystemComponent + + sandbox = FakeSandbox() + delattr(sandbox, "filesystem") + + await CuaFileSystemComponent(sandbox).write_file("hello.txt", "hello") + + command = sandbox.shell.commands[0][0] + assert "python3 -c" in command + assert "base64 -d" not in command + + +@pytest.mark.asyncio +async def test_cua_create_file_reports_mode_as_informational(): + from astrbot.core.computer.booters.cua import CuaFileSystemComponent + + sandbox = FakeSandbox() + + result = await CuaFileSystemComponent(sandbox).create_file("hello.txt", mode=0o600) + + assert result["success"] is True + assert result["mode"] == 0o600 + assert result["mode_applied"] is False + + +@pytest.mark.asyncio +async def test_cua_write_file_shell_fallback_propagates_shell_failure(): + from astrbot.core.computer.booters.cua import CuaFileSystemComponent + + sandbox = FakeSandbox() + sandbox.shell = FailingShell() + delattr(sandbox, "filesystem") + + result = await CuaFileSystemComponent(sandbox).write_file("hello.txt", "hello") + + assert result["success"] is False + assert "requires python3" in result["stderr"] + assert "python3: command not found" in result["stderr"] + assert result["path"] == "hello.txt" + + +@pytest.mark.asyncio +async def test_cua_edit_file_propagates_write_failure(): + from astrbot.core.computer.booters.cua import CuaFileSystemComponent + + class ReadableButFailingWriteShell: + def __init__(self): + self.commands = [] + + async def run(self, command: str, **kwargs): + self.commands.append((command, kwargs)) + if command.startswith("cat "): + return {"stdout": "hello old", "stderr": "", "exit_code": 0} + return { + "stdout": "", + "stderr": "permission denied", + "exit_code": 1, + "success": False, + } + + sandbox = FakeSandbox() + sandbox.shell = ReadableButFailingWriteShell() + delattr(sandbox, "filesystem") + + result = await CuaFileSystemComponent(sandbox).edit_file("hello.txt", "old", "new") + + assert result["success"] is False + assert result["stderr"] == "permission denied" + assert result["path"] == "hello.txt" + + +@pytest.mark.asyncio +async def test_cua_list_dir_shell_fallback_returns_filename_only_entries(): + from astrbot.core.computer.booters.cua import CuaFileSystemComponent + + sandbox = FakeSandbox() + sandbox.shell = SyncShell("alpha.txt\nfolder\n") + delattr(sandbox, "filesystem") + + result = await CuaFileSystemComponent(sandbox).list_dir(".", show_hidden=True) + + assert result["entries"] == ["alpha.txt", "folder"] + assert sandbox.shell.commands[0][0] == "ls -1A ." + + +@pytest.mark.asyncio +async def test_cua_shell_filesystem_fallback_rejects_non_posix_os_type(): + from astrbot.core.computer.booters.cua import CuaFileSystemComponent + + sandbox = SandboxWithoutFilesystem() + fs = CuaFileSystemComponent(sandbox, os_type="windows") + + read_result = await fs.read_file("hello.txt") + write_result = await fs.write_file("hello.txt", "hello") + delete_result = await fs.delete_file("hello.txt") + list_result = await fs.list_dir(".") + + for result in (read_result, write_result, delete_result, list_result): + assert result["success"] is False + assert ( + "filesystem shell fallback is only supported for POSIX" in result["error"] + ) + assert sandbox.shell.commands == [] + + +@pytest.mark.asyncio +async def test_cua_shell_and_python_accept_sync_sdk_methods(): + from astrbot.core.computer.booters.cua import CuaPythonComponent, CuaShellComponent + + sandbox = FakeSandbox() + sandbox.shell = SyncShell() + sandbox.python = SyncPython() + + shell_result = await CuaShellComponent(sandbox).exec("echo ok") + python_result = await CuaPythonComponent(sandbox).exec("print('ok')") + + assert shell_result["stdout"] == "ok" + assert python_result["data"]["output"]["text"] == "sync" + + +@pytest.mark.asyncio +async def test_cua_shell_normalizes_output_returncode_shape(): + from astrbot.core.computer.booters.cua import CuaShellComponent + + sandbox = FakeSandbox() + sandbox.shell = ProcessShapeShell() + + result = await CuaShellComponent(sandbox).exec("echo ok") + + assert result == { + "stdout": "shape-ok", + "stderr": "", + "exit_code": 0, + "success": True, + } + + +@pytest.mark.asyncio +async def test_cua_shell_normalizes_command_result_object_shape(): + from astrbot.core.computer.booters.cua import CuaShellComponent + + sandbox = FakeSandbox() + sandbox.shell = CommandResultShapeShell(stdout="hello\n", returncode=0) + + result = await CuaShellComponent(sandbox).exec("echo hello") + + assert result == { + "stdout": "hello\n", + "stderr": "", + "exit_code": 0, + "success": True, + } + + +@pytest.mark.asyncio +async def test_cua_shell_prefers_returncode_when_exit_code_is_none(): + from astrbot.core.computer.booters.cua import CuaShellComponent + + class ShellWithMixedExitCode: + async def run(self, command: str, **kwargs): + return { + "stdout": "", + "stderr": "", + "exit_code": None, + "returncode": 1, + } + + sandbox = FakeSandbox() + sandbox.shell = ShellWithMixedExitCode() + + result = await CuaShellComponent(sandbox).exec("false") + + assert result["exit_code"] == 1 + assert result["success"] is False + + +@pytest.mark.asyncio +async def test_cua_python_fallback_preserves_shell_command_result_stdout(): + from astrbot.core.computer.booters.cua import CuaPythonComponent + + sandbox = SandboxWithoutFilesystem() + sandbox.shell = CommandResultShapeShell(stdout="from python fallback\n") + delattr(sandbox, "python") + + result = await CuaPythonComponent(sandbox).exec("print('from python fallback')") + + assert result["success"] is True + assert result["output"] == "from python fallback\n" + assert result["data"]["output"]["text"] == "from python fallback\n" + + +@pytest.mark.asyncio +async def test_cua_shell_background_wrapper_detaches_via_python_subprocess(): + from astrbot.core.computer.booters.cua import CuaShellComponent + + sandbox = FakeSandbox() + + await CuaShellComponent(sandbox).exec( + "chromium https://example.com", background=True + ) + + command = sandbox.shell.commands[0][0] + assert command.startswith("python3 -c ") + assert "subprocess.Popen" in command + assert "start_new_session=True" in command + assert "p.pid" in command + assert "stdout=subprocess.DEVNULL" in command + assert "stderr=subprocess.DEVNULL" in command + assert "time.sleep(0.2)" in command + assert "'chromium https://example.com'" in command + assert "&" not in command + + +@pytest.mark.asyncio +async def test_cua_shell_background_rejects_non_posix_os_type(): + from astrbot.core.computer.booters.cua import CuaShellComponent + + sandbox = FakeSandbox() + + result = await CuaShellComponent(sandbox, os_type="windows").exec( + "start notepad", background=True + ) + + assert result == { + "stdout": "", + "stderr": "error: background shell execution is only supported for POSIX CUA images.", + "exit_code": 2, + "success": False, + } + assert sandbox.shell.commands == [] + + +@pytest.mark.asyncio +async def test_cua_upload_file_fallback_rejects_non_posix_os_type(tmp_path): + from astrbot.core.computer.booters.cua import ( + CuaBooter, + CuaFileSystemComponent, + CuaGUIComponent, + CuaPythonComponent, + CuaShellComponent, + _CuaRuntime, + ) + + local_file = tmp_path / "upload.txt" + local_file.write_text("hello", encoding="utf-8") + sandbox = SandboxWithoutFilesystem() + booter = CuaBooter(os_type="windows") + booter._runtime = _CuaRuntime( + sandbox_cm=object(), + sandbox=sandbox, + shell=CuaShellComponent(sandbox, os_type="windows"), + python=CuaPythonComponent(sandbox, os_type="windows"), + fs=CuaFileSystemComponent(sandbox, os_type="windows"), + gui=CuaGUIComponent(sandbox), + ) + + result = await booter.upload_file(str(local_file), "remote.txt") + + assert result["success"] is False + assert "filesystem shell fallback is only supported for POSIX" in result["error"] + assert sandbox.shell.commands == [] + + +@pytest.mark.asyncio +async def test_cua_download_file_shell_quotes_remote_path(tmp_path): + from astrbot.core.computer.booters.cua import ( + CuaBooter, + CuaFileSystemComponent, + CuaGUIComponent, + CuaPythonComponent, + CuaShellComponent, + _CuaRuntime, + ) + + class Base64Shell(FakeShell): + async def run(self, command: str, **kwargs): + self.commands.append((command, kwargs)) + return { + "stdout": base64.b64encode(b"hello").decode(), + "stderr": "", + "exit_code": 0, + } + + sandbox = SandboxWithoutFilesystem() + sandbox.shell = Base64Shell() + booter = CuaBooter() + booter._runtime = _CuaRuntime( + sandbox_cm=object(), + sandbox=sandbox, + shell=CuaShellComponent(sandbox), + python=CuaPythonComponent(sandbox), + fs=CuaFileSystemComponent(sandbox), + gui=CuaGUIComponent(sandbox), + ) + remote_path = "folder/it's file.txt" + local_path = tmp_path / "download.txt" + + await booter.download_file(remote_path, str(local_path)) + + assert sandbox.shell.commands[0][0] == f"base64 {shlex.quote(remote_path)}" + assert local_path.read_bytes() == b"hello" + + +@pytest.mark.asyncio +async def test_cua_download_file_fallback_rejects_non_posix_os_type(tmp_path): + from astrbot.core.computer.booters.cua import ( + CuaBooter, + CuaFileSystemComponent, + CuaGUIComponent, + CuaPythonComponent, + CuaShellComponent, + _CuaRuntime, + ) + + sandbox = SandboxWithoutFilesystem() + booter = CuaBooter(os_type="windows") + booter._runtime = _CuaRuntime( + sandbox_cm=object(), + sandbox=sandbox, + shell=CuaShellComponent(sandbox, os_type="windows"), + python=CuaPythonComponent(sandbox, os_type="windows"), + fs=CuaFileSystemComponent(sandbox, os_type="windows"), + gui=CuaGUIComponent(sandbox), + ) + + with pytest.raises(RuntimeError, match="filesystem shell fallback"): + await booter.download_file("remote.txt", str(tmp_path / "download.txt")) + + assert sandbox.shell.commands == [] + + +@pytest.mark.asyncio +async def test_cua_boot_cleans_up_sandbox_when_component_setup_fails(monkeypatch): + from astrbot.core.computer.booters import cua as cua_booter + + closed = [] + + class FakeSandboxContext: + async def __aenter__(self): + return FakeSandbox() + + async def __aexit__(self, exc_type, exc, tb): + closed.append((exc_type, exc, tb)) + + class FakeImage: + @staticmethod + def linux(): + return "linux-image" + + class FakeSandboxFactory: + @staticmethod + def ephemeral(image, **kwargs): + return FakeSandboxContext() + + class BrokenShellComponent: + def __init__(self, sandbox, os_type="linux"): + raise RuntimeError("component setup failed") + + original_import = __import__ + + def fake_import(name, globals=None, locals=None, fromlist=(), level=0): + if name == "cua": + + class FakeCuaModule: + Image = FakeImage + Sandbox = FakeSandboxFactory + + return FakeCuaModule() + return original_import(name, globals, locals, fromlist, level) + + monkeypatch.setattr("builtins.__import__", fake_import) + monkeypatch.setattr(cua_booter, "CuaShellComponent", BrokenShellComponent) + + booter = cua_booter.CuaBooter() + + with pytest.raises(RuntimeError, match="component setup failed"): + await booter.boot("session") + + assert len(closed) == 1 + assert booter._runtime is None + + +@pytest.mark.asyncio +async def test_cua_shell_background_reports_missing_python3_requirement(): + from astrbot.core.computer.booters.cua import CuaShellComponent + + sandbox = FakeSandbox() + sandbox.shell = FailingShell() + + result = await CuaShellComponent(sandbox).exec("firefox", background=True) + + assert result["success"] is False + assert "requires python3" in result["stderr"] + assert "python3: command not found" in result["stderr"] + + +@pytest.mark.asyncio +async def test_cua_python_fallback_reports_missing_python3_requirement(): + from astrbot.core.computer.booters.cua import CuaPythonComponent + + sandbox = SandboxWithoutFilesystem() + sandbox.shell = FailingShell() + delattr(sandbox, "python") + + result = await CuaPythonComponent(sandbox).exec("print('hello')") + + assert result["success"] is False + assert "requires python3" in result["error"] + assert "python3: command not found" in result["error"] + + +@pytest.mark.asyncio +async def test_cua_gui_reports_missing_mouse_or_keyboard(): + from astrbot.core.computer.booters.cua import CuaGUIComponent + + class SandboxWithoutGuiDevices: + async def screenshot(self): + return b"fake-png" + + gui = CuaGUIComponent(SandboxWithoutGuiDevices()) + + with pytest.raises(RuntimeError, match="mouse.*click"): + await gui.click(1, 2) + + with pytest.raises(RuntimeError, match="keyboard.*type"): + await gui.type_text("hello") + + with pytest.raises(RuntimeError, match="keyboard.*press"): + await gui.press_key("Enter") + + +@pytest.mark.asyncio +async def test_cua_gui_press_error_lists_probed_methods(): + from astrbot.core.computer.booters.cua import CuaGUIComponent + + class SandboxWithoutPress: + keyboard = object() + + gui = CuaGUIComponent(SandboxWithoutPress()) + + with pytest.raises(RuntimeError) as exc_info: + await gui.press_key("Enter") + + message = str(exc_info.value) + assert "keyboard.press" in message + assert "keyboard.key_press" in message + assert "keyboard.press_key" in message + + +@pytest.mark.asyncio +async def test_cua_gui_caches_component_methods_after_initialization(): + from astrbot.core.computer.booters.cua import CuaGUIComponent + + class CountingMouse: + def __init__(self): + self.click_lookups = 0 + self.clicks = [] + + def __getattribute__(self, name): + if name == "click": + object.__getattribute__(self, "__dict__")["click_lookups"] += 1 + return object.__getattribute__(self, name) + + async def click(self, x: int, y: int, button: str = "left"): + self.clicks.append((x, y, button)) + return {"success": True} + + class Sandbox: + def __init__(self): + self.mouse = CountingMouse() + + sandbox = Sandbox() + gui = CuaGUIComponent(sandbox) + + await gui.click(1, 2) + await gui.click(3, 4, button="right") + + assert sandbox.mouse.click_lookups == 1 + assert sandbox.mouse.clicks == [(1, 2, "left"), (3, 4, "right")] + + +def test_cua_capabilities_reflect_initialized_sandbox_gui_devices(): + from astrbot.core.computer.booters.cua import ( + CuaBooter, + CuaFileSystemComponent, + CuaGUIComponent, + CuaPythonComponent, + CuaShellComponent, + _CuaRuntime, + ) + + def set_runtime(booter, sandbox): + shell = CuaShellComponent(sandbox) + booter._runtime = _CuaRuntime( + sandbox_cm=object(), + sandbox=sandbox, + shell=shell, + python=CuaPythonComponent(sandbox), + fs=CuaFileSystemComponent(sandbox), + gui=CuaGUIComponent(sandbox), + ) + + booter = CuaBooter() + set_runtime(booter, FakeSandbox()) + + assert booter.capabilities == ( + "python", + "shell", + "filesystem", + "gui", + "screenshot", + "mouse", + "keyboard", + ) + + class ScreenshotOnlySandbox: + shell = FakeShell() + + async def screenshot(self): + return b"fake-png" + + set_runtime(booter, ScreenshotOnlySandbox()) + + assert booter.capabilities == ("python", "shell", "filesystem", "gui", "screenshot") + + +@pytest.mark.asyncio +async def test_cua_shutdown_clears_cached_components(): + from astrbot.core.computer.booters.cua import ( + CuaBooter, + CuaFileSystemComponent, + CuaGUIComponent, + CuaPythonComponent, + CuaShellComponent, + _CuaRuntime, + ) + + closed = [] + + class FakeSandboxContext: + async def __aexit__(self, exc_type, exc, tb): + closed.append(True) + + booter = CuaBooter() + sandbox = FakeSandbox() + booter._runtime = _CuaRuntime( + sandbox_cm=FakeSandboxContext(), + sandbox=sandbox, + shell=CuaShellComponent(sandbox), + python=CuaPythonComponent(sandbox), + fs=CuaFileSystemComponent(sandbox), + gui=CuaGUIComponent(sandbox), + ) + + await booter.shutdown() + + assert closed == [True] + assert await booter.available() is False + assert booter._runtime is None + + +def test_cua_tools_are_registered_as_builtin_tools(): + from astrbot.core.tools.computer_tools.cua import ( + CuaKeyboardTypeTool, + CuaMouseClickTool, + CuaScreenshotTool, + ) + + manager = FunctionToolManager() + + assert manager.get_builtin_tool(CuaScreenshotTool).name == "astrbot_cua_screenshot" + assert manager.get_builtin_tool(CuaMouseClickTool).name == "astrbot_cua_mouse_click" + assert ( + manager.get_builtin_tool(CuaKeyboardTypeTool).name + == "astrbot_cua_keyboard_type" + ) + + +def test_cua_runtime_tools_are_available_to_handoffs(): + manager = FunctionToolManager() + + tools = FunctionToolExecutor._get_runtime_computer_tools("sandbox", manager, "cua") + + assert "astrbot_cua_screenshot" in tools + assert "astrbot_cua_mouse_click" in tools + assert "astrbot_cua_keyboard_type" in tools + assert "astrbot_cua_key_press" not in tools + + +def test_runtime_tool_selection_treats_none_booter_as_empty(): + manager = FunctionToolManager() + + tools = FunctionToolExecutor._get_runtime_computer_tools("sandbox", manager, None) + + assert "astrbot_execute_shell" in tools + assert "astrbot_cua_screenshot" not in tools + + +def test_runtime_tool_selection_normalizes_cua_booter_case(): + manager = FunctionToolManager() + + tools = FunctionToolExecutor._get_runtime_computer_tools("sandbox", manager, "CUA") + + assert "astrbot_cua_screenshot" in tools + + +def test_cua_is_exposed_in_sandbox_config_metadata(): + items = _agent_computer_use_items() + booter = items["provider_settings.sandbox.booter"] + + assert "cua" in booter["options"] + assert "CUA" in booter["labels"] + assert "provider_settings.sandbox.cua_image" in items + assert "provider_settings.sandbox.cua_os_type" in items + assert "provider_settings.sandbox.cua_ttl" in items + assert "provider_settings.sandbox.cua_telemetry_enabled" in items + assert "provider_settings.sandbox.cua_local" in items + assert "provider_settings.sandbox.cua_api_key" in items + assert ( + items["provider_settings.sandbox.cua_api_key"]["condition"][ + "provider_settings.sandbox.cua_local" + ] + is False + ) + + +_PNG_BYTES = base64.b64decode( + "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO+/p9sAAAAASUVORK5CYII=" +) + + +@pytest.mark.asyncio +async def test_screenshot_tool_returns_image_and_sends_file(monkeypatch, tmp_path): + from astrbot.core.tools.computer_tools import cua as cua_tools + from astrbot.core.tools.computer_tools.cua import CuaScreenshotTool + + sent_messages = [] + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + async def send(self, message): + sent_messages.append(message) + + class FakeAstrContext: + event = FakeEvent() + context = FakeContext( + { + "provider_settings": { + "computer_use_runtime": "sandbox", + "computer_use_require_admin": True, + "sandbox": {"booter": "cua"}, + } + } + ) + + class FakeWrapper: + context = FakeAstrContext() + + class FakeGUI: + async def screenshot(self, path: str): + Path(path).write_bytes(b"fake-png") + return { + "success": True, + "path": path, + "mime_type": "image/png", + "base64": base64.b64encode(b"fake-png").decode(), + } + + class FakeBooter: + gui = FakeGUI() + + async def fake_get_booter(context, session_id): + return FakeBooter() + + monkeypatch.setattr(cua_tools, "get_booter", fake_get_booter) + monkeypatch.setattr(cua_tools, "get_astrbot_temp_path", lambda: str(tmp_path)) + + result = await CuaScreenshotTool().call(FakeWrapper(), send_to_user=True) + + assert isinstance(result, mcp.types.CallToolResult) + image_parts = [part for part in result.content if part.type == "image"] + text_parts = [part for part in result.content if part.type == "text"] + payload = json.loads(text_parts[0].text) + assert image_parts[0].data == base64.b64encode(b"fake-png").decode() + assert "base64" not in payload + assert Path(payload["path"]).exists() + assert sent_messages + + +@pytest.mark.parametrize( + "screenshot_shape", + [ + "data_url", + "path_string", + "save_object", + "base64_dict", + ], +) +@pytest.mark.asyncio +async def test_screenshot_tool_normalizes_supported_screenshot_shapes( + monkeypatch, + tmp_path, + screenshot_shape, +): + from astrbot.core.computer.booters.cua import CuaGUIComponent + from astrbot.core.tools.computer_tools import cua as cua_tools + from astrbot.core.tools.computer_tools.cua import CuaScreenshotTool + + sent_messages = [] + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + async def send(self, message): + sent_messages.append(message) + + class FakeAstrContext: + event = FakeEvent() + context = FakeContext( + { + "provider_settings": { + "computer_use_runtime": "sandbox", + "computer_use_require_admin": True, + "sandbox": {"booter": "cua"}, + } + } + ) + + class FakeWrapper: + context = FakeAstrContext() + + class SaveObject: + def save(self, output, format): + assert format == "PNG" + output.write(_PNG_BYTES) + + class FakeSandbox: + async def screenshot(self): + if screenshot_shape == "data_url": + encoded = base64.b64encode(_PNG_BYTES).decode() + return f"data:image/png;base64,{encoded}" + if screenshot_shape == "path_string": + source_path = tmp_path / "source.png" + source_path.write_bytes(_PNG_BYTES) + return str(source_path) + if screenshot_shape == "save_object": + return SaveObject() + return {"base64": base64.b64encode(_PNG_BYTES).decode()} + + class FakeBooter: + gui = CuaGUIComponent(FakeSandbox()) + + async def fake_get_booter(context, session_id): + return FakeBooter() + + monkeypatch.setattr(cua_tools, "get_booter", fake_get_booter) + monkeypatch.setattr(cua_tools, "get_astrbot_temp_path", lambda: str(tmp_path)) + + result = await CuaScreenshotTool().call(FakeWrapper(), send_to_user=True) + + assert isinstance(result, mcp.types.CallToolResult) + image_parts = [part for part in result.content if part.type == "image"] + text_parts = [part for part in result.content if part.type == "text"] + payload = json.loads(text_parts[0].text) + assert "base64" not in payload + assert payload["mime_type"] == "image/png" + assert Path(payload["path"]).read_bytes() == _PNG_BYTES + assert base64.b64decode(image_parts[0].data) == _PNG_BYTES + assert sent_messages + + +@pytest.mark.asyncio +async def test_screenshot_tool_can_opt_in_to_llm_image_content(monkeypatch, tmp_path): + from astrbot.core.tools.computer_tools import cua as cua_tools + from astrbot.core.tools.computer_tools.cua import CuaScreenshotTool + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + async def send(self, message): + pass + + class FakeAstrContext: + event = FakeEvent() + context = FakeContext( + {"provider_settings": {"computer_use_require_admin": True}} + ) + + class FakeWrapper: + context = FakeAstrContext() + + class FakeGUI: + async def screenshot(self, path: str): + Path(path).write_bytes(b"fake-png") + return { + "success": True, + "path": path, + "mime_type": "image/png", + "base64": base64.b64encode(b"fake-png").decode(), + } + + class FakeBooter: + gui = FakeGUI() + + async def fake_get_booter(context, session_id): + return FakeBooter() + + monkeypatch.setattr(cua_tools, "get_booter", fake_get_booter) + monkeypatch.setattr(cua_tools, "get_astrbot_temp_path", lambda: str(tmp_path)) + + result = await CuaScreenshotTool().call( + FakeWrapper(), send_to_user=False, return_image_to_llm=True + ) + + image_parts = [part for part in result.content if part.type == "image"] + text_parts = [part for part in result.content if part.type == "text"] + payload = json.loads(text_parts[0].text) + assert image_parts[0].data == base64.b64encode(b"fake-png").decode() + assert "base64" not in payload + + +@pytest.mark.asyncio +async def test_screenshot_tool_can_opt_out_of_llm_image_content(monkeypatch, tmp_path): + from astrbot.core.tools.computer_tools import cua as cua_tools + from astrbot.core.tools.computer_tools.cua import CuaScreenshotTool + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + async def send(self, message): + pass + + class FakeAstrContext: + event = FakeEvent() + context = FakeContext( + {"provider_settings": {"computer_use_require_admin": True}} + ) + + class FakeWrapper: + context = FakeAstrContext() + + class FakeGUI: + async def screenshot(self, path: str): + Path(path).write_bytes(b"fake-png") + return { + "success": True, + "path": path, + "mime_type": "image/png", + "base64": base64.b64encode(b"fake-png").decode(), + } + + class FakeBooter: + gui = FakeGUI() + + async def fake_get_booter(context, session_id): + return FakeBooter() + + monkeypatch.setattr(cua_tools, "get_booter", fake_get_booter) + monkeypatch.setattr(cua_tools, "get_astrbot_temp_path", lambda: str(tmp_path)) + + result = await CuaScreenshotTool().call( + FakeWrapper(), send_to_user=False, return_image_to_llm=False + ) + + image_parts = [part for part in result.content if part.type == "image"] + text_parts = [part for part in result.content if part.type == "text"] + payload = json.loads(text_parts[0].text) + assert image_parts == [] + assert "base64" not in payload + + +@pytest.mark.asyncio +async def test_cua_tools_return_permission_error_without_gui_lookup(monkeypatch): + from astrbot.core.tools.computer_tools import cua as cua_tools + from astrbot.core.tools.computer_tools.cua import ( + CuaKeyboardTypeTool, + CuaMouseClickTool, + CuaScreenshotTool, + ) + + sent_messages = [] + + class FakeEvent: + unified_msg_origin = "umo" + role = "member" + + async def send(self, message): + sent_messages.append(message) + + class FakeAstrContext: + event = FakeEvent() + context = FakeContext({"provider_settings": {}}) + + class FakeWrapper: + context = FakeAstrContext() + + async def fail_gui_lookup(context): + raise AssertionError("GUI lookup should not run after permission failure") + + monkeypatch.setattr(cua_tools, "check_admin_permission", lambda *args: "denied") + monkeypatch.setattr(cua_tools, "_get_gui_component", fail_gui_lookup) + + assert await CuaScreenshotTool().call(FakeWrapper()) == "denied" + assert await CuaMouseClickTool().call(FakeWrapper(), x=1, y=2) == "denied" + assert await CuaKeyboardTypeTool().call(FakeWrapper(), text="hello") == "denied" + assert sent_messages == [] + + +@pytest.mark.asyncio +async def test_cua_tools_include_exception_type_for_blank_error(monkeypatch): + from astrbot.core.tools.computer_tools import cua as cua_tools + from astrbot.core.tools.computer_tools.cua import CuaMouseClickTool + + class BlankError(Exception): + def __str__(self): + return "" + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + class FakeAstrContext: + event = FakeEvent() + context = FakeContext( + {"provider_settings": {"computer_use_require_admin": True}} + ) + + class FakeWrapper: + context = FakeAstrContext() + + async def fail_gui_lookup(context): + raise BlankError() + + monkeypatch.setattr(cua_tools, "_get_gui_component", fail_gui_lookup) + + assert await CuaMouseClickTool().call(FakeWrapper(), x=1, y=2) == ( + "Error clicking CUA desktop: BlankError" + ) + + +@pytest.mark.asyncio +async def test_cua_mouse_click_tool_happy_path_forwards_args_and_serializes_json( + monkeypatch, +): + from astrbot.core.tools.computer_tools import cua as cua_tools + from astrbot.core.tools.computer_tools.cua import CuaMouseClickTool + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + class FakeAstrContext: + event = FakeEvent() + context = FakeContext( + {"provider_settings": {"computer_use_require_admin": True}} + ) + + class FakeWrapper: + context = FakeAstrContext() + + class FakeGui: + def __init__(self): + self.clicked_args = None + + async def click(self, x: int, y: int, button: str = "left"): + self.clicked_args = (x, y, button) + return {"status": "ok", "x": x, "y": y, "button": button} + + fake_gui = FakeGui() + get_gui_called = {"value": False} + wrapper = FakeWrapper() + + async def fake_get_gui_component(context): + get_gui_called["value"] = True + assert context is wrapper + return fake_gui + + monkeypatch.setattr(cua_tools, "_get_gui_component", fake_get_gui_component) + + result = await CuaMouseClickTool().call(wrapper, x=10, y=20, button="right") + + assert get_gui_called["value"] is True + assert fake_gui.clicked_args == (10, 20, "right") + assert json.loads(result) == { + "status": "ok", + "x": 10, + "y": 20, + "button": "right", + } + + +@pytest.mark.asyncio +async def test_cua_keyboard_type_tool_happy_path_forwards_args_and_serializes_json( + monkeypatch, +): + from astrbot.core.tools.computer_tools import cua as cua_tools + from astrbot.core.tools.computer_tools.cua import CuaKeyboardTypeTool + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + class FakeAstrContext: + event = FakeEvent() + context = FakeContext( + {"provider_settings": {"computer_use_require_admin": True}} + ) + + class FakeWrapper: + context = FakeAstrContext() + + class FakeGui: + def __init__(self): + self.typed_text_args = None + + async def type_text(self, text: str): + self.typed_text_args = (text,) + return {"status": "ok", "text": text} + + fake_gui = FakeGui() + get_gui_called = {"value": False} + wrapper = FakeWrapper() + + async def fake_get_gui_component(context): + get_gui_called["value"] = True + assert context is wrapper + return fake_gui + + monkeypatch.setattr(cua_tools, "_get_gui_component", fake_get_gui_component) + + result = await CuaKeyboardTypeTool().call(wrapper, text="Hello CUA") + + assert get_gui_called["value"] is True + assert fake_gui.typed_text_args == ("Hello CUA",) + assert json.loads(result) == {"status": "ok", "text": "Hello CUA"} diff --git a/tests/unit/test_func_tool_manager.py b/tests/unit/test_func_tool_manager.py index c87a2de085..4eae43b5ce 100644 --- a/tests/unit/test_func_tool_manager.py +++ b/tests/unit/test_func_tool_manager.py @@ -1,9 +1,15 @@ +import json + +import pytest + from astrbot.core import sp from astrbot.core.provider.func_tool_manager import FunctionToolManager from astrbot.core.tools.computer_tools.shell import ExecuteShellTool from astrbot.core.tools.message_tools import SendMessageToUserTool -from astrbot.core.tools.web_search_tools import FirecrawlExtractWebPageTool -from astrbot.core.tools.web_search_tools import FirecrawlWebSearchTool +from astrbot.core.tools.web_search_tools import ( + FirecrawlExtractWebPageTool, + FirecrawlWebSearchTool, +) def test_get_builtin_tool_by_class_returns_cached_instance(): @@ -39,9 +45,284 @@ def test_computer_tools_are_registered_as_builtin_tools(): tool = manager.get_builtin_tool(ExecuteShellTool) assert tool.name == "astrbot_execute_shell" + assert tool.parameters["properties"]["background"]["default"] is False assert manager.is_builtin_tool("astrbot_execute_shell") is True +@pytest.mark.asyncio +async def test_execute_shell_defaults_to_foreground(monkeypatch): + from astrbot.core.tools.computer_tools import shell as shell_tools + + calls = [] + + class FakeShell: + async def exec(self, command, cwd=None, background=False, env=None): + calls.append({"command": command, "background": background}) + return {"success": True, "stdout": "", "stderr": "", "exit_code": 0} + + class FakeBooter: + shell = FakeShell() + + class FakeConfig: + def get_config(self, umo): + return {"provider_settings": {"computer_use_runtime": "sandbox"}} + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + class FakeAstrContext: + context = FakeConfig() + event = FakeEvent() + + class FakeWrapper: + context = FakeAstrContext() + + async def fake_get_booter(context, session_id): + return FakeBooter() + + monkeypatch.setattr(shell_tools, "get_booter", fake_get_booter) + + result = await ExecuteShellTool().call( + FakeWrapper(), command="chromium https://example.com" + ) + + assert json.loads(result)["success"] is True + assert calls == [{"command": "chromium https://example.com", "background": False}] + + +@pytest.mark.asyncio +async def test_execute_shell_uses_fresh_default_env_per_call(monkeypatch): + from astrbot.core.tools.computer_tools import shell as shell_tools + + calls = [] + + class FakeShell: + async def exec(self, command, cwd=None, background=False, env=None): + env["MUTATED_BY_FAKE_SHELL"] = command + calls.append(env) + return {"success": True, "stdout": "", "stderr": "", "exit_code": 0} + + class FakeBooter: + shell = FakeShell() + + class FakeConfig: + def get_config(self, umo): + return {"provider_settings": {"computer_use_runtime": "sandbox"}} + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + class FakeAstrContext: + context = FakeConfig() + event = FakeEvent() + + class FakeWrapper: + context = FakeAstrContext() + + async def fake_get_booter(context, session_id): + return FakeBooter() + + monkeypatch.setattr(shell_tools, "get_booter", fake_get_booter) + tool = ExecuteShellTool() + + await tool.call(FakeWrapper(), command="first") + await tool.call(FakeWrapper(), command="second") + + assert calls[0] is not calls[1] + assert calls[0]["MUTATED_BY_FAKE_SHELL"] == "first" + assert calls[1] == {"MUTATED_BY_FAKE_SHELL": "second"} + + +@pytest.mark.asyncio +async def test_execute_shell_copies_user_env_before_execution(monkeypatch): + from astrbot.core.tools.computer_tools import shell as shell_tools + + calls = [] + + class FakeShell: + async def exec(self, command, cwd=None, background=False, env=None): + env["MUTATED_BY_FAKE_SHELL"] = command + calls.append(env) + return {"success": True, "stdout": "", "stderr": "", "exit_code": 0} + + class FakeBooter: + shell = FakeShell() + + class FakeConfig: + def get_config(self, umo): + return {"provider_settings": {"computer_use_runtime": "sandbox"}} + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + class FakeAstrContext: + context = FakeConfig() + event = FakeEvent() + + class FakeWrapper: + context = FakeAstrContext() + + async def fake_get_booter(context, session_id): + return FakeBooter() + + monkeypatch.setattr(shell_tools, "get_booter", fake_get_booter) + original_env = {"FOO": "bar"} + + await ExecuteShellTool().call(FakeWrapper(), command="first", env=original_env) + + assert original_env == {"FOO": "bar"} + assert calls == [{"FOO": "bar", "MUTATED_BY_FAKE_SHELL": "first"}] + + +@pytest.mark.asyncio +async def test_execute_shell_avoids_double_background_for_detached_commands( + monkeypatch, +): + from astrbot.core.tools.computer_tools import shell as shell_tools + + calls = [] + + class FakeShell: + async def exec(self, command, cwd=None, background=False, env=None): + calls.append({"command": command, "background": background}) + return {"success": True, "stdout": "", "stderr": "", "exit_code": 0} + + class FakeBooter: + shell = FakeShell() + + class FakeConfig: + def get_config(self, umo): + return {"provider_settings": {"computer_use_runtime": "sandbox"}} + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + class FakeAstrContext: + context = FakeConfig() + event = FakeEvent() + + class FakeWrapper: + context = FakeAstrContext() + + async def fake_get_booter(context, session_id): + return FakeBooter() + + monkeypatch.setattr(shell_tools, "get_booter", fake_get_booter) + + command = "nohup firefox >/tmp/astrbot-firefox.log 2>&1 &" + result = await ExecuteShellTool().call( + FakeWrapper(), command=command, background=True + ) + + assert json.loads(result)["success"] is True + assert calls == [{"command": command, "background": False}] + + +@pytest.mark.asyncio +async def test_execute_shell_recognizes_commented_background_command(monkeypatch): + from astrbot.core.tools.computer_tools import shell as shell_tools + + calls = [] + + class FakeShell: + async def exec(self, command, cwd=None, background=False, env=None): + calls.append({"command": command, "background": background}) + return {"success": True, "stdout": "", "stderr": "", "exit_code": 0} + + class FakeBooter: + shell = FakeShell() + + class FakeConfig: + def get_config(self, umo): + return {"provider_settings": {"computer_use_runtime": "sandbox"}} + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + class FakeAstrContext: + context = FakeConfig() + event = FakeEvent() + + class FakeWrapper: + context = FakeAstrContext() + + async def fake_get_booter(context, session_id): + return FakeBooter() + + monkeypatch.setattr(shell_tools, "get_booter", fake_get_booter) + + command = "firefox & # already detached" + result = await ExecuteShellTool().call( + FakeWrapper(), command=command, background=True + ) + + assert json.loads(result)["success"] is True + assert calls == [{"command": command, "background": False}] + + +@pytest.mark.parametrize( + ("command", "expected"), + [ + ("echo '#'", False), + ("echo '&'", False), + ("echo foo#bar &", True), + ("echo 'unterminated", False), + ("firefox & # already detached", True), + ("nohup firefox >/tmp/astrbot-firefox.log 2>&1 &", True), + ("firefox", False), + ], +) +def test_is_self_detached_command_handles_quotes_and_comments(command, expected): + from astrbot.core.tools.computer_tools.shell import _is_self_detached_command + + assert _is_self_detached_command(command) is expected + + +@pytest.mark.asyncio +async def test_execute_shell_reports_blank_exception_type(monkeypatch): + from astrbot.core.tools.computer_tools import shell as shell_tools + + class BlankError(Exception): + def __str__(self): + return "" + + class FakeShell: + async def exec(self, command, cwd=None, background=False, env=None): + raise BlankError() + + class FakeBooter: + shell = FakeShell() + + class FakeConfig: + def get_config(self, umo): + return {"provider_settings": {"computer_use_runtime": "sandbox"}} + + class FakeEvent: + unified_msg_origin = "umo" + role = "admin" + + class FakeAstrContext: + context = FakeConfig() + event = FakeEvent() + + class FakeWrapper: + context = FakeAstrContext() + + async def fake_get_booter(context, session_id): + return FakeBooter() + + monkeypatch.setattr(shell_tools, "get_booter", fake_get_booter) + + result = await ExecuteShellTool().call(FakeWrapper(), command="firefox") + + assert result == "Error executing command: BlankError" + + def test_firecrawl_tools_are_registered_as_builtin_tools(): manager = FunctionToolManager()