diff --git a/cortex/cli.py b/cortex/cli.py index 9261a816..da11deb8 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -37,7 +37,7 @@ class CortexCLI: - def __init__(self, verbose: bool = False): + def __init__(self, verbose: bool = False, role: str = "default"): self.spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] self.spinner_idx = 0 self.verbose = verbose diff --git a/cortex/confirmation.py b/cortex/confirmation.py new file mode 100644 index 00000000..adb2ce17 --- /dev/null +++ b/cortex/confirmation.py @@ -0,0 +1,11 @@ +def confirm_plan(steps): + print("\nProposed installation plan:\n") + for step in steps: + print(step) + + print("\nProceed?") + print("[y] yes [e] edit [n] cancel") + + choice = input("> ").lower() + + return choice diff --git a/cortex/llm/interpreter.py b/cortex/llm/interpreter.py index 069771b8..ccbcf4f1 100644 --- a/cortex/llm/interpreter.py +++ b/cortex/llm/interpreter.py @@ -28,19 +28,24 @@ def __init__( self, api_key: str, provider: str = "openai", + role: str = "default", model: str | None = None, cache: Optional["SemanticCache"] = None, ): - """Initialize the command interpreter. - - Args: - api_key: API key for the LLM provider - provider: Provider name ("openai", "claude", or "ollama") - model: Optional model name override - cache: Optional SemanticCache instance for response caching + """Args: + api_key: API key for the LLM provider + provider: Provider name ("openai", "claude", or "ollama") + model: Optional model name override + cache: Optional SemanticCache instance for response caching """ + from cortex.roles.loader import load_role + self.api_key = api_key self.provider = APIProvider(provider.lower()) + # Load role and system prompt + self.role_name = role + self.role = load_role(role) + self.system_prompt: str = self.role.get("system_prompt", "") if cache is None: try: @@ -112,35 +117,36 @@ def _get_system_prompt(self, simplified: bool = False) -> str: Args: simplified: If True, return a shorter prompt optimized for local models """ - if simplified: - return """You must respond with ONLY a JSON object. No explanations, no markdown, no code blocks. - -Format: {"commands": ["command1", "command2"]} - -Example input: install nginx -Example output: {"commands": ["sudo apt update", "sudo apt install -y nginx"]} - -Rules: -- Use apt for Ubuntu packages -- Add sudo for system commands -- Return ONLY the JSON object""" - return """You are a Linux system command expert. Convert natural language requests into safe, validated bash commands. - -Rules: -1. Return ONLY a JSON array of commands -2. Each command must be a safe, executable bash command -3. Commands should be atomic and sequential -4. Avoid destructive operations without explicit user confirmation -5. Use package managers appropriate for Debian/Ubuntu systems (apt) -6. Include necessary privilege escalation (sudo) when required -7. Validate command syntax before returning - -Format: -{"commands": ["command1", "command2", ...]} + if simplified: + base_prompt = ( + "You must respond with ONLY a JSON object. No explanations, no markdown.\n\n" + "Format:\n" + '{"commands": ["command1", "command2"]}\n\n' + "Rules:\n" + "- Use apt for Ubuntu packages\n" + "- Add sudo for system commands\n" + "- Return ONLY the JSON object\n" + ) + else: + base_prompt = ( + "You are a Linux system command expert. Convert natural language requests " + "into safe, validated bash commands.\n\n" + "Rules:\n" + "1. Return ONLY a JSON array of commands\n" + "2. Each command must be safe and executable\n" + "3. Commands should be atomic and sequential\n" + "4. Avoid destructive operations without explicit user confirmation\n" + "5. Use apt for Debian/Ubuntu systems\n" + "6. Include sudo when required\n\n" + "Format:\n" + '{"commands": ["command1", "command2", ...]}\n' + ) -Example request: "install docker with nvidia support" -Example response: {"commands": ["sudo apt update", "sudo apt install -y docker.io", "sudo apt install -y nvidia-docker2", "sudo systemctl restart docker"]}""" + system_prompt = getattr(self, "system_prompt", "") + if system_prompt: + return f"{self.system_prompt}\n\n{base_prompt}" + return base_prompt def _call_openai(self, user_input: str) -> list[str]: try: @@ -181,6 +187,7 @@ def _call_ollama(self, user_input: str) -> list[str]: enhanced_input = f"""{user_input} Respond with ONLY this JSON format (no explanations): + {{\"commands\": [\"command1\", \"command2\"]}}""" response = self.client.chat.completions.create( @@ -233,55 +240,39 @@ def _repair_json(self, content: str) -> str: def _parse_commands(self, content: str) -> list[str]: try: - # Strip markdown code blocks - if "```json" in content: - content = content.split("```json")[1].split("```")[0].strip() - elif "```" in content: + # Remove markdown code blocks if present + if "```" in content: parts = content.split("```") - if len(parts) >= 3: - content = parts[1].strip() + if len(parts) >= 2: + content = parts[1] - # Try to find JSON object in the content import re - # Look for {"commands": [...]} pattern - json_match = re.search( - r'\{\s*["\']commands["\']\s*:\s*\[.*?\]\s*\}', content, re.DOTALL + match = re.search( + r'\{\s*"commands"\s*:\s*\[.*?\]\s*\}', + content, + re.DOTALL, ) - if json_match: - content = json_match.group(0) - # Try to repair common JSON issues - content = self._repair_json(content) + if not match: + raise ValueError("No valid JSON command block found") - data = json.loads(content) - commands = data.get("commands", []) + json_text = match.group(0) + data = json.loads(json_text) + commands = data.get("commands") if not isinstance(commands, list): - raise ValueError("Commands must be a list") + raise ValueError("commands must be a list") - # Handle both formats: - # 1. ["cmd1", "cmd2"] - direct string array - # 2. [{"command": "cmd1"}, {"command": "cmd2"}] - object array - result = [] + cleaned: list[str] = [] for cmd in commands: - if isinstance(cmd, str): - # Direct string - if cmd: - result.append(cmd) - elif isinstance(cmd, dict): - # Object with "command" key - cmd_str = cmd.get("command", "") - if cmd_str: - result.append(cmd_str) - - return result - except (json.JSONDecodeError, ValueError) as e: - # Log the problematic content for debugging - import sys - - print(f"\nDebug: Failed to parse JSON. Raw content:\n{content[:500]}", file=sys.stderr) - raise ValueError(f"Failed to parse LLM response: {str(e)}") + if isinstance(cmd, str) and cmd.strip(): + cleaned.append(cmd.strip()) + + return cleaned + + except Exception as exc: + raise ValueError(f"Failed to parse LLM response: {exc}") from exc def _validate_commands(self, commands: list[str]) -> list[str]: dangerous_patterns = [ @@ -303,19 +294,6 @@ def _validate_commands(self, commands: list[str]) -> list[str]: return validated def parse(self, user_input: str, validate: bool = True) -> list[str]: - """Parse natural language input into shell commands. - - Args: - user_input: Natural language description of desired action - validate: If True, validate commands for dangerous patterns - - Returns: - List of shell commands to execute - - Raises: - ValueError: If input is empty - RuntimeError: If offline mode is enabled and no cached response exists - """ if not user_input or not user_input.strip(): raise ValueError("User input cannot be empty") @@ -347,23 +325,13 @@ def parse(self, user_input: str, validate: bool = True) -> list[str]: if validate: commands = self._validate_commands(commands) - if self.cache is not None and commands: - try: - self.cache.put_commands( - prompt=user_input, - provider=self.provider.value, - model=self.model, - system_prompt=cache_system_prompt, - commands=commands, - ) - except (OSError, sqlite3.Error): - # Silently fail cache writes - not critical for operation - pass - return commands def parse_with_context( - self, user_input: str, system_info: dict[str, Any] | None = None, validate: bool = True + self, + user_input: str, + system_info: dict[str, Any] | None = None, + validate: bool = True, ) -> list[str]: context = "" if system_info: diff --git a/cortex/planner.py b/cortex/planner.py new file mode 100644 index 00000000..7969e860 --- /dev/null +++ b/cortex/planner.py @@ -0,0 +1,39 @@ +from typing import Any, dict, list + +from cortex.llm.interpreter import CommandInterpreter + + +def generate_plan(intent: str, slots: dict[str, Any]) -> list[str]: + """ + Generate a human-readable installation plan using LLM (Ollama). + """ + + prompt = f""" +You are a DevOps assistant. + +User intent: +{intent} + +Extracted details: +{slots} + +Generate a step-by-step installation plan. +Rules: +- High-level steps only +- No shell commands +- One sentence per step +- Return as a JSON list of strings + +Example: +["Install Python", "Install ML libraries", "Set up Jupyter"] +""" + + interpreter = CommandInterpreter( + api_key="ollama", # dummy value, Ollama ignores it + provider="ollama", + ) + + # Reuse interpreter to get structured output + steps = interpreter.parse(prompt, validate=False) + + return steps diff --git a/cortex/roles/default.yaml b/cortex/roles/default.yaml new file mode 100644 index 00000000..e59f0076 --- /dev/null +++ b/cortex/roles/default.yaml @@ -0,0 +1,6 @@ +name: default +description: General-purpose Linux and software assistant +system_prompt: | + You are Cortex, an AI-powered Linux command interpreter. + Provide clear, safe, and correct Linux commands. + Explain steps when helpful, but keep responses concise. diff --git a/cortex/roles/devops.yaml b/cortex/roles/devops.yaml new file mode 100644 index 00000000..0b68f95f --- /dev/null +++ b/cortex/roles/devops.yaml @@ -0,0 +1,6 @@ +name: devops +description: DevOps and infrastructure automation expert +system_prompt: | + You are a DevOps-focused AI assistant. + Optimize for reliability, automation, and scalability. + Prefer idempotent commands and infrastructure-as-code approaches. diff --git a/cortex/roles/loader.py b/cortex/roles/loader.py new file mode 100644 index 00000000..ae7a7942 --- /dev/null +++ b/cortex/roles/loader.py @@ -0,0 +1,32 @@ +import os + +import yaml + + +class RoleNotFoundError(Exception): + pass + + +def get_roles_dir(): + """ + Returns the directory where built-in roles are stored. + """ + return os.path.dirname(__file__) + + +def load_role(role_name: str) -> dict: + """ + Load a role YAML by name. + Falls back to default if role not found. + """ + roles_dir = get_roles_dir() + role_file = os.path.join(roles_dir, f"{role_name}.yaml") + + if not os.path.exists(role_file): + if role_name != "default": + # Fallback to default role + return load_role("default") + raise RoleNotFoundError("Default role not found") + + with open(role_file, encoding="utf-8") as f: + return yaml.safe_load(f) diff --git a/cortex/roles/security.yaml b/cortex/roles/security.yaml new file mode 100644 index 00000000..57cc4776 --- /dev/null +++ b/cortex/roles/security.yaml @@ -0,0 +1,7 @@ +name: security +description: Security auditing and hardening expert +system_prompt: | + You are a security-focused AI assistant. + Always prioritize safety, least privilege, and risk mitigation. + Warn before destructive or irreversible actions. + Prefer secure defaults and compliance best practices. diff --git a/test_parallel_llm.py b/test_parallel_llm.py new file mode 100755 index 00000000..f154f2b8 --- /dev/null +++ b/test_parallel_llm.py @@ -0,0 +1,314 @@ +#!/usr/bin/env python3 +""" +Quick test script to verify parallel LLM calls are working. + +Run this to test: +1. Async completion works +2. Batch processing works +3. Rate limiting works +4. Helper functions work +""" + +import asyncio +import os +import sys +import time + +# Add parent directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".")) + +from cortex.llm_router import ( + LLMRouter, + TaskType, + check_hardware_configs_parallel, + diagnose_errors_parallel, + query_multiple_packages, +) + + +async def test_async_completion(): + """Test basic async completion.""" + print("=" * 60) + print("Test 1: Async Completion") + print("=" * 60) + + router = LLMRouter() + + if not router.claude_client_async and not router.kimi_client_async: + print("⚠️ No API keys found. Set ANTHROPIC_API_KEY or MOONSHOT_API_KEY") + print(" Skipping async completion test...") + return False + + try: + start = time.time() + response = await router.acomplete( + messages=[{"role": "user", "content": "Say 'Hello from async'"}], + task_type=TaskType.USER_CHAT, + max_tokens=50, + ) + elapsed = time.time() - start + + print("✅ Async completion successful!") + print(f" Provider: {response.provider.value}") + print(f" Latency: {elapsed:.2f}s") + print(f" Response: {response.content[:100]}") + print(f" Tokens: {response.tokens_used}") + return True + except Exception as e: + print(f"❌ Async completion failed: {e}") + return False + + +async def test_batch_processing(): + """Test batch processing.""" + print("\n" + "=" * 60) + print("Test 2: Batch Processing") + print("=" * 60) + + router = LLMRouter() + + if not router.claude_client_async and not router.kimi_client_async: + print("⚠️ No API keys found. Skipping batch test...") + return False + + try: + requests = [ + { + "messages": [{"role": "user", "content": "What is 1+1?"}], + "task_type": TaskType.USER_CHAT, + "max_tokens": 20, + }, + { + "messages": [{"role": "user", "content": "What is 2+2?"}], + "task_type": TaskType.USER_CHAT, + "max_tokens": 20, + }, + { + "messages": [{"role": "user", "content": "What is 3+3?"}], + "task_type": TaskType.USER_CHAT, + "max_tokens": 20, + }, + ] + + print(f"Processing {len(requests)} requests in parallel...") + start = time.time() + responses = await router.complete_batch(requests, max_concurrent=3) + elapsed = time.time() - start + + print("✅ Batch processing successful!") + print(f" Total time: {elapsed:.2f}s") + print(f" Average per request: {elapsed / len(requests):.2f}s") + + for i, response in enumerate(responses, 1): + if response.model == "error": + print(f" Request {i}: ❌ Error - {response.content}") + else: + print(f" Request {i}: ✅ {response.content[:50]}...") + + return all(r.model != "error" for r in responses) + except Exception as e: + print(f"❌ Batch processing failed: {e}") + import traceback + + traceback.print_exc() + return False + + +async def test_rate_limiting(): + """Test rate limiting.""" + print("\n" + "=" * 60) + print("Test 3: Rate Limiting") + print("=" * 60) + + router = LLMRouter() + router.set_rate_limit(max_concurrent=2) + + if not router.claude_client_async and not router.kimi_client_async: + print("⚠️ No API keys found. Skipping rate limit test...") + return False + + try: + # Create 5 requests but limit to 2 concurrent + requests = [ + { + "messages": [{"role": "user", "content": f"Count: {i}"}], + "task_type": TaskType.USER_CHAT, + "max_tokens": 10, + } + for i in range(5) + ] + + print(f"Processing {len(requests)} requests with max_concurrent=2...") + start = time.time() + responses = await router.complete_batch(requests, max_concurrent=2) + elapsed = time.time() - start + + print("✅ Rate limiting working!") + print(f" Total time: {elapsed:.2f}s") + print(f" Semaphore value: {router._rate_limit_semaphore._value}") + return True + except Exception as e: + print(f"❌ Rate limiting test failed: {e}") + return False + + +async def test_helper_functions(): + """Test helper functions.""" + print("\n" + "=" * 60) + print("Test 4: Helper Functions") + print("=" * 60) + + router = LLMRouter() + + if not router.claude_client_async and not router.kimi_client_async: + print("⚠️ No API keys found. Skipping helper function tests...") + return False + + results = [] + + # Test query_multiple_packages + try: + print("\n4a. Testing query_multiple_packages...") + packages = ["nginx", "postgresql"] + responses = await query_multiple_packages(router, packages, max_concurrent=2) + print(f" ✅ Queried {len(responses)} packages") + results.append(True) + except Exception as e: + print(f" ❌ Failed: {e}") + results.append(False) + + # Test diagnose_errors_parallel + try: + print("\n4b. Testing diagnose_errors_parallel...") + errors = ["Test error 1", "Test error 2"] + diagnoses = await diagnose_errors_parallel(router, errors, max_concurrent=2) + print(f" ✅ Diagnosed {len(diagnoses)} errors") + results.append(True) + except Exception as e: + print(f" ❌ Failed: {e}") + results.append(False) + + # Test check_hardware_configs_parallel + try: + print("\n4c. Testing check_hardware_configs_parallel...") + components = ["nvidia_gpu", "intel_cpu"] + configs = await check_hardware_configs_parallel(router, components, max_concurrent=2) + print(f" ✅ Checked {len(configs)} components") + results.append(True) + except Exception as e: + print(f" ❌ Failed: {e}") + results.append(False) + + return all(results) + + +async def test_performance_comparison(): + """Compare sequential vs parallel performance.""" + print("\n" + "=" * 60) + print("Test 5: Performance Comparison") + print("=" * 60) + + router = LLMRouter() + + if not router.claude_client_async and not router.kimi_client_async: + print("⚠️ No API keys found. Skipping performance test...") + return False + + try: + requests = [ + { + "messages": [{"role": "user", "content": f"Request {i}"}], + "task_type": TaskType.USER_CHAT, + "max_tokens": 20, + } + for i in range(3) + ] + + # Simulate sequential (would be slower) + print("Simulating sequential execution...") + start_seq = time.time() + for req in requests: + await router.acomplete( + **{k: v for k, v in req.items() if k != "task_type"}, task_type=req["task_type"] + ) + elapsed_seq = time.time() - start_seq + + # Parallel execution + print("Running parallel execution...") + start_par = time.time() + await router.complete_batch(requests, max_concurrent=3) + elapsed_par = time.time() - start_par + + speedup = elapsed_seq / elapsed_par if elapsed_par > 0 else 1.0 + print("\n✅ Performance comparison:") + print(f" Sequential: {elapsed_seq:.2f}s") + print(f" Parallel: {elapsed_par:.2f}s") + print(f" Speedup: {speedup:.2f}x") + + return speedup > 1.0 + except Exception as e: + print(f"❌ Performance test failed: {e}") + return False + + +async def main(): + """Run all tests.""" + print("\n" + "=" * 60) + print("Parallel LLM Calls - Test Suite") + print("=" * 60) + print("\nChecking API keys...") + + # Check for API keys + has_claude = bool(os.getenv("ANTHROPIC_API_KEY")) + has_kimi = bool(os.getenv("MOONSHOT_API_KEY")) + + if has_claude: + print("✅ ANTHROPIC_API_KEY found") + else: + print("⚠️ ANTHROPIC_API_KEY not set") + + if has_kimi: + print("✅ MOONSHOT_API_KEY found") + else: + print("⚠️ MOONSHOT_API_KEY not set") + + if not has_claude and not has_kimi: + print("\n❌ No API keys found!") + print(" Set at least one:") + print(" export ANTHROPIC_API_KEY='your-key'") + print(" export MOONSHOT_API_KEY='your-key'") + return + + print("\n" + "=" * 60) + print("Running tests...") + print("=" * 60) + + results = [] + + # Run tests + results.append(await test_async_completion()) + results.append(await test_batch_processing()) + results.append(await test_rate_limiting()) + results.append(await test_helper_functions()) + results.append(await test_performance_comparison()) + + # Summary + print("\n" + "=" * 60) + print("Test Summary") + print("=" * 60) + passed = sum(results) + total = len(results) + print(f"\n✅ Passed: {passed}/{total}") + print(f"❌ Failed: {total - passed}/{total}") + + if all(results): + print("\n🎉 All tests passed! Parallel LLM calls are working correctly.") + else: + print("\n⚠️ Some tests failed. Check the output above for details.") + + return all(results) + + +if __name__ == "__main__": + success = asyncio.run(main()) + sys.exit(0 if success else 1) diff --git a/tests/integration/test_end_to_end.py b/tests/integration/test_end_to_end.py index 00776095..e92665f5 100644 --- a/tests/integration/test_end_to_end.py +++ b/tests/integration/test_end_to_end.py @@ -110,6 +110,9 @@ def test_project_tests_run_inside_container(self): "CORTEX_PROVIDER": "fake", "CORTEX_FAKE_COMMANDS": json.dumps({"commands": ["echo plan"]}), } + + result = self._run("python test/run_all_tests.py", env=env) + # Use PIP_BOOTSTRAP_DEV to install pytest and other dev dependencies effective_env = dict(BASE_ENV) effective_env.update(env)