Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 106 additions & 16 deletions src/apm_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from apm_cli.commands._helpers import (
ERROR,
RESET,
WARNING,
_check_and_notify_updates,
print_version,
)
Expand Down Expand Up @@ -74,34 +75,117 @@ def cli(ctx):
cli.add_command(marketplace_search, name="search")


def _get_current_code_page() -> "Optional[int]":
"""Get current Windows console code page using WinAPI.

Returns the code page number (e.g., 65001 for UTF-8, 950 for CP950).
Returns None if detection fails or on non-Windows platforms.
"""
if sys.platform != "win32":
return None

try:
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]
return kernel32.GetConsoleOutputCP()
except Exception:
return None


def _code_page_to_encoding_name(cp: int) -> str:
"""Map code page number to readable encoding name.

Args:
cp: Code page number (e.g., 950, 65001).

Returns:
Human-readable encoding name or fallback name.
"""
cp_map = {
65001: "UTF-8",
950: "cp950 (Traditional Chinese)",
936: "cp936 (Simplified Chinese)",
932: "cp932 (Japanese)",
949: "cp949 (Korean)",
1252: "cp1252 (Western European)",
1251: "cp1251 (Cyrillic)",
}
return cp_map.get(cp, f"cp{cp}")


def _try_switch_to_utf8() -> bool:
"""Try to switch console to UTF-8 (code page 65001).

This function:
1. Checks if console is already UTF-8.
2. If not, attempts to switch using SetConsoleCP/SetConsoleOutputCP.
3. Verifies success by re-checking the code page.

Returns:
True if already UTF-8 or successfully switched, False otherwise.
"""
if sys.platform != "win32":
return True

try:
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]

# Check current console code page
current_cp = kernel32.GetConsoleOutputCP()
if current_cp == 65001:
return True # Already UTF-8

# Attempt to switch to UTF-8
kernel32.SetConsoleOutputCP(65001)
kernel32.SetConsoleCP(65001)

# Verify success
new_cp = kernel32.GetConsoleOutputCP()
return new_cp == 65001
except Exception:
return False


def _warn_encoding_issue(failed_cp: int) -> None:
"""Warn user if console UTF-8 switch failed.

Args:
failed_cp: The code page that failed to switch from.
"""
encoding_name = _code_page_to_encoding_name(failed_cp)
click.echo(
f"\n{WARNING}Warning: Console is {encoding_name}, UTF-8 switch failed.{RESET}\n",
err=True,
)
click.echo(
f"{WARNING}Display issues may occur. Suggestions:{RESET}",
err=True,
)
click.echo(" - Run: chcp 65001 (if available)", err=True)
click.echo(" - Or use: Windows Terminal or VS Code terminal\n", err=True)


def _configure_encoding() -> None:
"""Configure stdout/stderr for full Unicode on Windows.

The default Windows console encoding (cp1252) cannot represent many Unicode
characters used in APM output (box-drawing, check marks, arrows, etc.).
The default Windows console encoding (cp1252 or cp950) cannot represent many
Unicode characters used in APM output (box-drawing, check marks, arrows, etc.).

This function:
1. Sets ``PYTHONIOENCODING`` so child processes and redirected pipes
default to UTF-8.
2. Switches the console codepage to 65001 (UTF-8) via the Win32 API so
the terminal itself renders UTF-8 byte sequences correctly.
3. Reconfigures the Python text-mode streams to UTF-8.
1. Attempts to switch console to UTF-8 (code page 65001) via WinAPI.
2. Sets ``PYTHONIOENCODING`` for child processes.
3. Reconfigures Python text-mode streams to UTF-8.
4. Only warns if UTF-8 switch fails.

On non-Windows platforms this is a no-op.
"""
if sys.platform != "win32":
return

# 1. Help child processes / pipes default to UTF-8
os.environ.setdefault("PYTHONIOENCODING", "utf-8")
# 1. Try to switch console to UTF-8
utf8_success = _try_switch_to_utf8()

# 2. Switch the console codepage to UTF-8
try:
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]
kernel32.SetConsoleOutputCP(65001)
kernel32.SetConsoleCP(65001)
except (OSError, AttributeError):
pass # not a real console or ctypes unavailable
# 2. Help child processes / pipes default to UTF-8
os.environ.setdefault("PYTHONIOENCODING", "utf-8")

# 3. Reconfigure Python streams to UTF-8
for name in ("stdout", "stderr"):
Expand All @@ -115,6 +199,12 @@ def _configure_encoding() -> None:
except Exception:
pass

# 4. Warn only if UTF-8 switch failed
if not utf8_success:
current_cp = _get_current_code_page()
if current_cp and current_cp != 65001:
_warn_encoding_issue(current_cp)


def main():
"""Main entry point for the CLI."""
Expand Down
3 changes: 2 additions & 1 deletion src/apm_cli/commands/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def _auto_detect_author():

try:
result = subprocess.run(
["git", "config", "user.name"], capture_output=True, text=True, timeout=5
["git", "config", "user.name"], capture_output=True, text=True, encoding="utf-8", timeout=5
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
Expand All @@ -382,6 +382,7 @@ def _auto_detect_description(project_name):
["git", "config", "--get", "remote.origin.url"],
capture_output=True,
text=True,
encoding="utf-8",
timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
Expand Down
1 change: 1 addition & 0 deletions src/apm_cli/commands/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ def _validate_package_exists(package, verbose=False, auth_resolver=None):
cmd,
capture_output=True,
text=True,
encoding="utf-8",
timeout=30,
env=validate_env,
)
Expand Down
1 change: 1 addition & 0 deletions src/apm_cli/core/token_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def resolve_credential_from_git(host: str) -> Optional[str]:
input=f"protocol=https\nhost={host}\n\n",
capture_output=True,
text=True,
encoding="utf-8",
timeout=GitHubTokenManager._get_credential_timeout(),
env={**os.environ, 'GIT_TERMINAL_PROMPT': '0',
'GIT_ASKPASS': '' if sys.platform != 'win32' else 'echo'},
Expand Down
2 changes: 1 addition & 1 deletion src/apm_cli/deps/github_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1557,7 +1557,7 @@ def _try_sparse_checkout(self, dep_ref: DependencyReference, temp_clone_path: Pa
for cmd in cmds:
result = subprocess.run(
cmd, cwd=str(temp_clone_path), env=env,
capture_output=True, text=True, timeout=120,
capture_output=True, text=True, encoding="utf-8", timeout=120,
)
if result.returncode != 0:
_debug(f"Sparse-checkout step failed ({' '.join(cmd)}): {result.stderr.strip()}")
Expand Down
1 change: 1 addition & 0 deletions src/apm_cli/policy/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def _extract_org_from_git_remote(
["git", "remote", "get-url", "origin"],
capture_output=True,
text=True,
encoding="utf-8",
cwd=project_root,
timeout=5,
)
Expand Down
3 changes: 2 additions & 1 deletion src/apm_cli/runtime/codex_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def execute_prompt(self, prompt_content: str, **kwargs) -> str:
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Merge stderr into stdout for streaming
text=True,
encoding="utf-8",
bufsize=1, # Line buffered
universal_newlines=True
)

output_lines = []
Expand Down Expand Up @@ -107,6 +107,7 @@ def get_runtime_info(self) -> Dict[str, Any]:
["codex", "--version"],
capture_output=True,
text=True,
encoding="utf-8",
timeout=10
)

Expand Down
3 changes: 2 additions & 1 deletion src/apm_cli/runtime/copilot_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def execute_prompt(self, prompt_content: str, **kwargs) -> str:
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Merge stderr into stdout for streaming
text=True,
encoding="utf-8",
bufsize=1, # Line buffered
universal_newlines=True
)

output_lines = []
Expand Down Expand Up @@ -125,6 +125,7 @@ def get_runtime_info(self) -> Dict[str, Any]:
["copilot", "--version"],
capture_output=True,
text=True,
encoding="utf-8",
timeout=10
)

Expand Down
14 changes: 7 additions & 7 deletions src/apm_cli/runtime/llm_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def __init__(self, model_name: Optional[str] = None):

# Verify llm CLI is available
try:
result = subprocess.run(['llm', '--version'],
capture_output=True, text=True, check=True)
result = subprocess.run(['llm', '--version'],
capture_output=True, text=True, encoding="utf-8", check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
raise RuntimeError("llm CLI not found. Please install: pip install llm")

Expand Down Expand Up @@ -52,8 +52,8 @@ def execute_prompt(self, prompt_content: str, **kwargs) -> str:
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Merge stderr into stdout for streaming
text=True,
encoding="utf-8",
bufsize=1, # Line buffered
universal_newlines=True
)

output_lines = []
Expand Down Expand Up @@ -86,8 +86,8 @@ def list_available_models(self) -> Dict[str, Any]:
Dict[str, Any]: Dictionary of available models and their info
"""
try:
result = subprocess.run(['llm', 'models', 'list'],
capture_output=True, text=True, check=True)
result = subprocess.run(['llm', 'models', 'list'],
capture_output=True, text=True, encoding="utf-8", check=True)
models = {}
for line in result.stdout.strip().split('\n'):
if line.strip():
Expand Down Expand Up @@ -136,8 +136,8 @@ def is_available() -> bool:
bool: True if runtime is available, False otherwise
"""
try:
subprocess.run(['llm', '--version'],
capture_output=True, text=True, check=True)
subprocess.run(['llm', '--version'],
capture_output=True, text=True, encoding="utf-8", check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
Expand Down
27 changes: 15 additions & 12 deletions src/apm_cli/runtime/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ def get_embedded_script(self, script_name: str) -> str:
bundle_dir = Path(sys._MEIPASS)
script_path = bundle_dir / "scripts" / "runtime" / script_name
if script_path.exists():
return script_path.read_text()
return script_path.read_text(encoding="utf-8")

# Fall back to direct file access for development
# Look for scripts relative to the repo structure
current_file = Path(__file__)
repo_root = current_file.parent.parent.parent.parent # Go up to repo root
script_path = repo_root / "scripts" / "runtime" / script_name
if script_path.exists():
return script_path.read_text()
return script_path.read_text(encoding="utf-8")

raise FileNotFoundError(f"Script not found: {script_name}")
except Exception as e:
Expand All @@ -85,15 +85,15 @@ def get_token_helper_script(self) -> str:
bundle_dir = Path(sys._MEIPASS)
script_path = bundle_dir / "scripts" / "github-token-helper.sh"
if script_path.exists():
return script_path.read_text()
return script_path.read_text(encoding="utf-8")

# Fall back to direct file access for development
# Look for scripts relative to the repo structure
current_file = Path(__file__)
repo_root = current_file.parent.parent.parent.parent # Go up to repo root
script_path = repo_root / "scripts" / "github-token-helper.sh"
if script_path.exists():
return script_path.read_text()
return script_path.read_text(encoding="utf-8")

raise FileNotFoundError("github-token-helper.sh not found")
except Exception as e:
Expand All @@ -111,35 +111,35 @@ def run_embedded_script(self, script_content: str, common_content: str,
if self._is_windows:
# Write common utilities as PowerShell
common_script = temp_path / "setup-common.ps1"
common_script.write_text(common_content)
common_script.write_text(common_content, encoding="utf-8")

# Write GitHub token helper (empty on Windows)
token_helper_content = self.get_token_helper_script()
if token_helper_content:
token_helper_script = temp_path / "github-token-helper.ps1"
token_helper_script.write_text(token_helper_content)
token_helper_script.write_text(token_helper_content, encoding="utf-8")

# Write main script as PowerShell
main_script = temp_path / "setup-script.ps1"
main_script.write_text(script_content)
main_script.write_text(script_content, encoding="utf-8")
else:
# Write common utilities as bash
common_script = temp_path / "setup-common.sh"
common_script.write_text(common_content)
common_script.write_text(common_content, encoding="utf-8")
common_script.chmod(0o755)

# Write GitHub token helper
try:
token_helper_content = self.get_token_helper_script()
token_helper_script = temp_path / "github-token-helper.sh"
token_helper_script.write_text(token_helper_content)
token_helper_script.write_text(token_helper_content, encoding="utf-8")
token_helper_script.chmod(0o755)
except Exception as e:
click.echo(f"{Fore.YELLOW}[!] Token helper not available, scripts may use fallback authentication: {e}{Style.RESET_ALL}")

# Write main script as bash
main_script = temp_path / "setup-script.sh"
main_script.write_text(script_content)
main_script.write_text(script_content, encoding="utf-8")
main_script.chmod(0o755)

# Execute script with environment that includes npm authentication
Expand All @@ -166,6 +166,7 @@ def run_embedded_script(self, script_content: str, common_content: str,
cwd=temp_dir,
capture_output=False, # Show output to user
text=True,
encoding="utf-8",
env=env
)
return result.returncode == 0
Expand Down Expand Up @@ -255,6 +256,7 @@ def list_runtimes(self) -> Dict[str, Dict[str, str]]:
version_cmd,
capture_output=True,
text=True,
encoding="utf-8",
timeout=5
)
if result.returncode == 0:
Expand Down Expand Up @@ -293,7 +295,8 @@ def remove_runtime(self, runtime_name: str) -> bool:
result = subprocess.run(
["npm", "uninstall", "-g", "@github/copilot"],
capture_output=True,
text=True
text=True,
encoding="utf-8",
)
if result.returncode == 0:
click.echo(f"{Fore.GREEN}[+] Successfully removed {runtime_name} runtime{Style.RESET_ALL}")
Expand Down
1 change: 1 addition & 0 deletions src/apm_cli/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def get_build_sha() -> str:
cwd=repo_root,
capture_output=True,
text=True,
encoding="utf-8",
timeout=5,
)
if result.returncode == 0:
Expand Down
Loading
Loading