Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/lmcode/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,17 @@ def _wrapper(*args: Any, **kwargs: Any) -> str:
if is_dangerous and self._mode == "ask" and name not in self._always_allowed_tools:
_print_tool_preview(name, merged, old_content=old_content)
path_or_cmd = merged.get("path") or merged.get("command") or ""
ans = display_interactive_approval(name, str(path_or_cmd))

live_obj = getattr(self, "_current_live", None)
if live_obj:
live_obj.stop()

try:
ans = display_interactive_approval(name, str(path_or_cmd))
finally:
if live_obj:
live_obj.start()

if ans is None:
return "error: Tool execution cancelled by user."
elif ans == "no":
Expand Down Expand Up @@ -959,6 +969,7 @@ async def _run_turn(self, model: Any, user_input: str, live: Any = None) -> tupl
When :attr:`_verbose` is ``True``, each tool is wrapped with
:func:`_wrap_tool_verbose` to print its call and result inline.
"""
self._current_live = live
chat = self._ensure_chat()
chat.add_user_message(user_input)

Expand Down
196 changes: 128 additions & 68 deletions src/lmcode/ui/_interactive_prompt.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,139 @@
from __future__ import annotations

from typing import Any
import sys

from prompt_toolkit import Application
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout.containers import HSplit, Window
from prompt_toolkit.layout.controls import FormattedTextControl
from prompt_toolkit.layout.layout import Layout
from prompt_toolkit.widgets import TextArea
from lmcode.ui.colors import ACCENT, ACCENT_BRIGHT, TEXT_MUTED

_RESET = "\033[0m"
_HIDE_CURSOR = "\033[?25l"
_SHOW_CURSOR = "\033[?25h"

def display_interactive_approval(tool_name: str, path_or_cmd: str) -> str | None:
"""Show an inline interactive approval menu for tools in 'ask' mode.
Returns:
"yes": user approved
"no": user denied
"always": user approved and wants to auto-allow this tool
"<string>": user typed a redirect instruction
None: user pressed Ctrl+C
"""
options = [
("yes", "Yes"),
("no", "No"),
("always", "Yes — and allow this tool automatically from now on"),
]
selected_index = 0

text_area = TextArea(
prompt="[Text input box] ...or tell lmcode what to do instead: ", multiline=False
)

def get_radio_text() -> list[tuple[str, str]]:
result: list[tuple[str, str]] = []
result.append(("", f"Allow this change? ({tool_name})\n"))
for i, (_val, label) in enumerate(options):
if i == selected_index:
result.append(("class:selected", f"❯ {label}\n"))
else:
result.append(("", f" {label}\n"))
return result

radio_window = Window(content=FormattedTextControl(get_radio_text), dont_extend_height=True)

root_container = HSplit([radio_window, text_area])

layout = Layout(root_container, focused_element=text_area)

kb = KeyBindings()
def _ansi_fg(hex_color: str) -> str:
"""Convert ``#rrggbb`` to an ANSI 24-bit foreground escape sequence."""
h = hex_color.lstrip("#")
r, g, b = int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16)
return f"\033[38;2;{r};{g};{b}m"

@kb.add("up")
def _up(event: Any) -> None:
nonlocal selected_index
selected_index = max(0, selected_index - 1)

@kb.add("down")
def _down(event: Any) -> None:
nonlocal selected_index
selected_index = min(len(options) - 1, selected_index + 1)
def _read_key() -> str:
"""Read one keypress and return a normalised name.

@kb.add("enter")
def _enter(event: Any) -> None:
if text_area.text.strip():
event.app.exit(result=text_area.text)
else:
event.app.exit(result=options[selected_index][0])

# Keyboard interrupt handler
@kb.add("c-c")
def _ctrl_c(event: Any) -> None:
event.app.exit(result=None)
Handles arrow keys, Enter, Escape, and Ctrl-C cross-platform.
"""
if sys.platform == "win32":
import msvcrt

raw = msvcrt.getch()
if raw == b"\r":
return "enter"
if raw == b"\x1b":
return "escape"
if raw == b"\x03":
return "ctrl_c"
if raw in (b"\xe0", b"\x00"):
raw2 = msvcrt.getch()
if raw2 == b"H":
return "up"
if raw2 == b"P":
return "down"
return "other"
else:
import termios
import tty

fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setraw(fd)
ch = sys.stdin.read(1)
if ch in ("\r", "\n"):
return "enter"
if ch == "\x1b":
nxt = sys.stdin.read(1)
if nxt == "[":
nxt2 = sys.stdin.read(1)
if nxt2 == "A":
return "up"
if nxt2 == "B":
return "down"
return "escape"
if ch == "\x03":
return "ctrl_c"
return "other"
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)

app: Application[str | None] = Application(
layout=layout,
key_bindings=kb,
full_screen=False,
)

return app.run()
def display_interactive_approval(tool_name: str, path_or_cmd: str) -> str | None:
"""Arrow-key list selector to approve a tool execution securely.
Returns the user's decision or alternate instructions.
Calls `_read_key()` directly to avoid prompt_toolkit layout artifacts.
"""
choices = [
("yes", "Yes"),
("no", "No / Tell lmcode what to do instead"),
("always", "Yes — and allow this tool automatically from now on"),
]
title = f"Allow this change? ({tool_name})"

fg_accent = _ansi_fg(ACCENT)
fg_muted = _ansi_fg(TEXT_MUTED)
idx = [0]
total_lines = len(choices) + 4

def draw(first: bool = False) -> None:
if not first:
sys.stdout.write(f"\r\033[{total_lines}A\033[J")
sys.stdout.write(f"\r\n {fg_accent}{title}{_RESET}\n\n")
for i, (_, label) in enumerate(choices):
if i == idx[0]:
sys.stdout.write(f" {fg_accent}❯{_RESET} {label}\n")
else:
sys.stdout.write(f" {fg_muted}{label}{_RESET}\n")
sys.stdout.write(f"\n {fg_muted}↑↓ navigate · Enter confirm · Esc cancel{_RESET}")
sys.stdout.flush()

sys.stdout.write(_HIDE_CURSOR)
sys.stdout.flush()

result_code: str | None = None
try:
draw(first=True)
while True:
key = _read_key()
if key == "up":
idx[0] = max(0, idx[0] - 1)
draw()
elif key == "down":
idx[0] = min(len(choices) - 1, idx[0] + 1)
draw()
elif key == "enter":
sys.stdout.write(f"\r\033[{total_lines}A\033[J") # clear menu fully
sys.stdout.flush()
result_code = choices[idx[0]][0]
break
elif key in ("escape", "ctrl_c"):
sys.stdout.write(f"\r\033[{total_lines}A\033[J") # clear menu fully
sys.stdout.flush()
return None
finally:
sys.stdout.write(_SHOW_CURSOR)
sys.stdout.flush()

if result_code == "no":
fg_accent_bright = _ansi_fg(ACCENT_BRIGHT)
sys.stdout.write(
f"\r\033[K {fg_accent_bright}❯{_RESET} "
f"{fg_muted}Tell lmcode what to do instead: {_RESET}"
)
sys.stdout.flush()
try:
instructions = input().strip()
if instructions:
return instructions
return "no"
except (KeyboardInterrupt, EOFError):
return None

return result_code
Loading