Skip to content
139 changes: 121 additions & 18 deletions cortex/api_key_detector.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
"""
API Key Auto-Detection Module

Automatically detects API keys from common locations without requiring
user to set environment variables. Searches in order:

1. Environment variables: ANTHROPIC_API_KEY, OPENAI_API_KEY
2. ~/.cortex/.env
3. ~/.config/anthropic (Claude CLI location)
4. ~/.config/openai
5. .env in current directory

Implements caching to avoid repeated file checks and supports manual entry
with optional saving to ~/.cortex/.env.
Automatically detects API keys and provider preferences from common locations
without requiring user to manually set environment variables.

Detection order (highest priority first):
1. CORTEX_PROVIDER=ollama environment variable (for explicit Ollama mode)
2. API key environment variables: ANTHROPIC_API_KEY, OPENAI_API_KEY
3. Cached key location (~/.cortex/.api_key_cache)
4. Saved Ollama provider preference in ~/.cortex/.env (CORTEX_PROVIDER=ollama)
5. API keys in ~/.cortex/.env
6. ~/.config/anthropic/credentials.json (Claude CLI location)
7. ~/.config/openai/credentials.json
8. .env in current directory

Implements caching to avoid repeated file checks, file locking for safe
concurrent access, and supports manual entry with optional saving to
~/.cortex/.env.

"""

import fcntl
import json
import os
import re
Expand Down Expand Up @@ -75,15 +81,55 @@ def detect(self) -> tuple[bool, str | None, str | None, str | None]:
- provider: "anthropic" or "openai" (or None)
- source: Where the key was found (or None)
"""
# Check cached location first
# Check for explicit CORTEX_PROVIDER=ollama in environment variable first
if os.environ.get("CORTEX_PROVIDER", "").lower() == "ollama":
return (True, "ollama-local", "ollama", "environment")

# Check for API keys in environment variables (highest priority)
result = self._check_environment_api_keys()
if result:
return result

# Check cached location
result = self._check_cached_key()
if result:
return result

# Check in priority order
# Check for saved Ollama provider preference in config file
# (only if no API keys found in environment)
result = self._check_saved_ollama_provider()
if result:
return result

# Check other locations for API keys
result = self._check_all_locations()
return result or (False, None, None, None)

def _check_environment_api_keys(self) -> tuple[bool, str, str, str] | None:
"""Check for API keys in environment variables."""
for env_var, provider in ENV_VAR_PROVIDERS.items():
value = os.environ.get(env_var)
if value:
return (True, value, provider, "environment")
return None

def _check_saved_ollama_provider(self) -> tuple[bool, str, str, str] | None:
"""Check if Ollama was previously selected as the provider in config file."""
env_file = Path.home() / CORTEX_DIR / CORTEX_ENV_FILE
if env_file.exists():
try:
content = env_file.read_text()
for line in content.splitlines():
line = line.strip()
if line.startswith("CORTEX_PROVIDER="):
value = line.split("=", 1)[1].strip().strip("\"'").lower()
if value == "ollama":
return (True, "ollama-local", "ollama", str(env_file))
except OSError:
# Ignore errors reading env file; treat as no configured provider
pass
return None

def _check_cached_key(self) -> tuple[bool, str | None, str | None, str | None] | None:
"""Check if we have a cached key that still works."""
cached = self._get_cached_key()
Expand Down Expand Up @@ -173,6 +219,7 @@ def prompt_for_key(self) -> tuple[bool, str | None, str | None]:
return (False, None, None)

if provider == "ollama":
self._ask_to_save_ollama_preference()
return (True, "ollama-local", "ollama")

key = self._get_and_validate_key(provider)
Expand All @@ -182,6 +229,30 @@ def prompt_for_key(self) -> tuple[bool, str | None, str | None]:
self._ask_to_save_key(key, provider)
return (True, key, provider)

def _ask_to_save_ollama_preference(self) -> None:
"""Ask user if they want to save Ollama as their default provider."""
print(
f"\nSave Ollama as default provider to ~/{CORTEX_DIR}/{CORTEX_ENV_FILE}? [Y/n] ", end=""
)
try:
response = input().strip().lower()
except (EOFError, KeyboardInterrupt):
response = "n"

if response != "n":
self._save_provider_to_env("ollama")
cx_print(f"✓ Provider preference saved to ~/{CORTEX_DIR}/{CORTEX_ENV_FILE}", "success")

def _save_provider_to_env(self, provider: str) -> None:
"""Save provider preference to ~/.cortex/.env with file locking."""
try:
env_file = Path.home() / CORTEX_DIR / CORTEX_ENV_FILE
self._locked_read_modify_write(
env_file, self._update_or_append_key, "CORTEX_PROVIDER", provider
)
except Exception as e:
cx_print(f"Warning: Could not save provider to ~/.cortex/.env: {e}", "warning")

def _get_provider_choice(self) -> str | None:
"""Get user's provider choice."""
cx_print("No API key found. Select a provider:", "warning")
Expand Down Expand Up @@ -407,6 +478,40 @@ def _atomic_write(self, target_file: Path, content: str) -> None:
temp_file.chmod(0o600)
temp_file.replace(target_file)

def _locked_read_modify_write(self, env_file: Path, modifier_func: callable, *args) -> None:
"""
Perform a locked read-modify-write operation on a file.

Uses file locking to prevent race conditions when multiple processes
try to modify the same file concurrently.

Args:
env_file: The file to modify
modifier_func: Function that takes (existing_content, *args) and returns new content
*args: Additional arguments to pass to modifier_func
"""
env_file.parent.mkdir(parents=True, exist_ok=True)
lock_file = env_file.with_suffix(".lock")

# Create lock file if it doesn't exist
lock_file.touch(exist_ok=True)

with open(lock_file, "r+") as lock_fd:
# Acquire exclusive lock (blocks until available)
fcntl.flock(lock_fd, fcntl.LOCK_EX)
try:
# Read current content
existing = env_file.read_text() if env_file.exists() else ""

# Apply modification
updated = modifier_func(existing, *args)

# Write atomically
self._atomic_write(env_file, updated)
finally:
# Release lock
fcntl.flock(lock_fd, fcntl.LOCK_UN)

def _cache_key_location(self, key: str, provider: str, source: str):
"""
Cache the location where a key was found.
Expand Down Expand Up @@ -487,7 +592,8 @@ def _save_key_to_env(self, key: str, provider: str):
"""
Save API key to ~/.cortex/.env.

Uses atomic write operations to prevent corruption from concurrent access.
Uses file locking and atomic write operations to prevent corruption
and lost updates from concurrent access.

Args:
key: The API key to save
Expand All @@ -496,10 +602,7 @@ def _save_key_to_env(self, key: str, provider: str):
try:
env_file = Path.home() / CORTEX_DIR / CORTEX_ENV_FILE
var_name = self._get_env_var_name(provider)
existing = self._read_env_file(env_file)
updated = self._update_or_append_key(existing, var_name, key)

self._atomic_write(env_file, updated)
self._locked_read_modify_write(env_file, self._update_or_append_key, var_name, key)

except Exception as e:
# If save fails, print warning but don't crash
Expand Down
6 changes: 5 additions & 1 deletion cortex/installation_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import hashlib
import json
import logging
import os
import re
import sqlite3
import subprocess
Expand Down Expand Up @@ -80,10 +81,13 @@ def __init__(self, db_path: str = "/var/lib/cortex/history.db"):
self._init_database()

def _ensure_db_directory(self):
"""Ensure database directory exists"""
"""Ensure database directory exists and is writable"""
db_dir = Path(self.db_path).parent
try:
db_dir.mkdir(parents=True, exist_ok=True)
# Also check if we can actually write to this directory
if not os.access(db_dir, os.W_OK):
raise PermissionError(f"No write permission to {db_dir}")
except PermissionError:
# Fallback to user directory if system directory not accessible
user_dir = Path.home() / ".cortex"
Expand Down
3 changes: 3 additions & 0 deletions cortex/semantic_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def _ensure_db_directory(self) -> None:
db_dir = Path(self.db_path).parent
try:
db_dir.mkdir(parents=True, exist_ok=True)
# Also check if we can actually write to this directory
if not os.access(db_dir, os.W_OK):
raise PermissionError(f"No write permission to {db_dir}")
except PermissionError:
user_dir = Path.home() / ".cortex"
user_dir.mkdir(parents=True, exist_ok=True)
Expand Down
75 changes: 67 additions & 8 deletions tests/test_ollama_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
python tests/test_ollama_integration.py
"""

import os
import subprocess
import sys
from pathlib import Path
Expand All @@ -23,11 +24,68 @@

from cortex.llm_router import LLMProvider, LLMRouter, TaskType

# Mark all tests to skip if Ollama is not available
pytestmark = pytest.mark.skipif(
not subprocess.run(["which", "ollama"], capture_output=True).returncode == 0,
reason="Ollama is not installed. Install with: python scripts/setup_ollama.py",
)

def get_available_ollama_model() -> str | None:
"""Get the first available Ollama model, or None if none available."""
try:
result = subprocess.run(
["ollama", "list"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
# Parse output: skip header line, get first model name
lines = result.stdout.strip().split("\n")
if len(lines) > 1:
# Model name is the first column
parts = lines[1].split()
if parts:
model_name = parts[0]
return model_name
except Exception:
# Best-effort helper: on any error, behave as if no models are available.
pass
return None


def is_ollama_installed() -> bool:
"""Check if Ollama is installed."""
return subprocess.run(["which", "ollama"], capture_output=True).returncode == 0


def is_ollama_running() -> bool:
"""Check if Ollama service is running."""
try:
result = subprocess.run(
["ollama", "list"],
capture_output=True,
text=True,
timeout=5,
)
return result.returncode == 0
except Exception:
return False


# Get available model for tests (can be overridden via env var)
OLLAMA_TEST_MODEL = os.environ.get("OLLAMA_TEST_MODEL") or get_available_ollama_model()

# Mark all tests to skip if Ollama is not available or no models installed
pytestmark = [
pytest.mark.skipif(
not is_ollama_installed(),
reason="Ollama is not installed. Install with: python scripts/setup_ollama.py",
),
pytest.mark.skipif(
not is_ollama_running(),
reason="Ollama service is not running. Start with: ollama serve",
),
pytest.mark.skipif(
OLLAMA_TEST_MODEL is None,
reason="No Ollama models installed. Install with: ollama pull llama3.2",
),
]


def check_ollama_installed():
Expand Down Expand Up @@ -72,12 +130,13 @@ def check_ollama_running():
def test_llm_router():
"""Test LLMRouter with Ollama."""
print("3. Testing LLM Router with Ollama...")
print(f" Using model: {OLLAMA_TEST_MODEL}")

try:
# Initialize router with Ollama
router = LLMRouter(
ollama_base_url="http://localhost:11434",
ollama_model="llama3.2",
ollama_model=OLLAMA_TEST_MODEL,
default_provider=LLMProvider.OLLAMA,
enable_fallback=False, # Don't fall back to cloud APIs
)
Expand Down Expand Up @@ -118,7 +177,7 @@ def test_routing_decision():
try:
router = LLMRouter(
ollama_base_url="http://localhost:11434",
ollama_model="llama3.2",
ollama_model=OLLAMA_TEST_MODEL,
default_provider=LLMProvider.OLLAMA,
)

Expand Down Expand Up @@ -148,7 +207,7 @@ def test_stats_tracking():
try:
router = LLMRouter(
ollama_base_url="http://localhost:11434",
ollama_model="llama3.2",
ollama_model=OLLAMA_TEST_MODEL,
default_provider=LLMProvider.OLLAMA,
track_costs=True,
)
Expand Down
Loading