Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 49 additions & 52 deletions src/copilot_usage/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ def summary(
path = path or ctx.obj.get("path")
try:
sessions = get_all_sessions(path)
render_summary(
sessions, since=ensure_aware_opt(since), until=ensure_aware_opt(until)
)
except Exception as exc: # noqa: BLE001
click.echo(f"Error: {exc}", err=True)
except OSError as exc:
click.echo(f"Error reading sessions: {exc}", err=True)
sys.exit(1)
render_summary(
sessions, since=ensure_aware_opt(since), until=ensure_aware_opt(until)
)


# ---------------------------------------------------------------------------
Expand All @@ -341,45 +341,42 @@ def session(ctx: click.Context, session_id: str, path: Path | None) -> None:
path = path or ctx.obj.get("path")
try:
event_paths = discover_sessions(path)
if not event_paths:
click.echo("No sessions found.", err=True)
sys.exit(1)

# Fast path: skip directories that clearly cannot match the prefix.
# Only apply the pre-filter on UUID-shaped directory names (36 chars
# with 4 dashes), where the directory name IS the session ID.
# Non-UUID dirs (e.g. test fixtures) always need a full parse.
available: list[str] = []
for events_path in event_paths:
dir_name = events_path.parent.name
is_uuid_dir = len(dir_name) == 36 and dir_name.count("-") == 4
if (
len(session_id) >= 4
and is_uuid_dir
and not dir_name.startswith(session_id)
):
available.append(dir_name[:8])
continue
events = parse_events(events_path)
if not events:
continue
s = build_session_summary(events, session_dir=events_path.parent)
if s.session_id.startswith(session_id):
render_session_detail(events, s)
return
if s.session_id:
available.append(s.session_id[:8])

click.echo(f"Error: no session matching '{session_id}'", err=True)
if available:
click.echo(f"Available: {', '.join(available)}", err=True)
except OSError as exc:
click.echo(f"Error reading sessions: {exc}", err=True)
sys.exit(1)
except SystemExit:
raise
except Exception as exc: # noqa: BLE001
click.echo(f"Error: {exc}", err=True)
if not event_paths:
click.echo("No sessions found.", err=True)
sys.exit(1)

# Fast path: skip directories that clearly cannot match the prefix.
# Only apply the pre-filter on UUID-shaped directory names (36 chars
# with 4 dashes), where the directory name IS the session ID.
# Non-UUID dirs (e.g. test fixtures) always need a full parse.
available: list[str] = []
for events_path in event_paths:
dir_name = events_path.parent.name
is_uuid_dir = len(dir_name) == 36 and dir_name.count("-") == 4
if len(session_id) >= 4 and is_uuid_dir and not dir_name.startswith(session_id):
available.append(dir_name[:8])
continue
try:
events = parse_events(events_path)
except OSError:
continue
if not events:
continue
s = build_session_summary(events, session_dir=events_path.parent)
if s.session_id.startswith(session_id):
render_session_detail(events, s)
return
if s.session_id:
available.append(s.session_id[:8])

click.echo(f"Error: no session matching '{session_id}'", err=True)
if available:
click.echo(f"Available: {', '.join(available)}", err=True)
sys.exit(1)


# ---------------------------------------------------------------------------
# cost
Expand Down Expand Up @@ -417,16 +414,16 @@ def cost(
path = path or ctx.obj.get("path")
try:
sessions = get_all_sessions(path)

render_cost_view(
sessions,
since=ensure_aware_opt(since),
until=ensure_aware_opt(until),
)
except Exception as exc: # noqa: BLE001
click.echo(f"Error: {exc}", err=True)
except OSError as exc:
click.echo(f"Error reading sessions: {exc}", err=True)
sys.exit(1)

render_cost_view(
sessions,
since=ensure_aware_opt(since),
until=ensure_aware_opt(until),
)


# ---------------------------------------------------------------------------
# live
Expand All @@ -447,7 +444,7 @@ def live(ctx: click.Context, path: Path | None) -> None:
path = path or ctx.obj.get("path")
try:
sessions = get_all_sessions(path)
render_live_sessions(sessions)
except Exception as exc: # noqa: BLE001
click.echo(f"Error: {exc}", err=True)
except OSError as exc:
click.echo(f"Error reading sessions: {exc}", err=True)
sys.exit(1)
render_live_sessions(sessions)
4 changes: 2 additions & 2 deletions src/copilot_usage/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,8 @@ def get_all_sessions(base_path: Path | None = None) -> list[SessionSummary]:
for events_path in paths:
try:
events = parse_events(events_path)
except (FileNotFoundError, OSError) as exc:
logger.warning("Skipping vanished session %s: %s", events_path, exc)
except OSError as exc:
logger.warning("Skipping vanished session {}: {}", events_path, exc)
continue
if not events:
continue
Expand Down
15 changes: 9 additions & 6 deletions tests/copilot_usage/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def test_summary_invalid_path() -> None:


def test_summary_error_handling(tmp_path: Path, monkeypatch: Any) -> None:
"""Exercise the except-Exception branch (lines 77-79) in summary."""
"""OSError in get_all_sessions produces a friendly error message."""

def _exploding_sessions(_base: Path | None = None) -> list[object]:
msg = "disk on fire"
Expand All @@ -279,6 +279,7 @@ def _exploding_sessions(_base: Path | None = None) -> list[object]:
result = runner.invoke(main, ["summary", "--path", str(tmp_path)])
assert result.exit_code != 0
assert "disk on fire" in result.output
assert "Traceback" not in (result.output or "")


def test_session_no_sessions(tmp_path: Path, monkeypatch: Any) -> None:
Expand Down Expand Up @@ -319,7 +320,7 @@ def _fake_discover(_base: Path | None = None) -> list[Path]:


def test_session_error_handling(tmp_path: Path, monkeypatch: Any) -> None:
"""Trigger an exception in session detail → friendly error (lines 129-131)."""
"""PermissionError in discover_sessions produces a friendly error message."""

def _exploding_discover(_base: Path | None = None) -> list[Path]:
msg = "permission denied"
Expand Down Expand Up @@ -387,31 +388,33 @@ def test_cost_zero_multiplier_model(tmp_path: Path) -> None:


def test_cost_error_handling(tmp_path: Path, monkeypatch: Any) -> None:
"""Exercise the except-Exception branch (lines 226-228) in cost."""
"""OSError in get_all_sessions produces a friendly error message."""

def _exploding_sessions(_base: Path | None = None) -> list[object]:
msg = "cost explosion"
raise RuntimeError(msg)
raise OSError(msg)

monkeypatch.setattr("copilot_usage.cli.get_all_sessions", _exploding_sessions)
runner = CliRunner()
result = runner.invoke(main, ["cost", "--path", str(tmp_path)])
assert result.exit_code != 0
assert "cost explosion" in result.output
assert "Traceback" not in (result.output or "")


def test_live_error_handling(tmp_path: Path, monkeypatch: Any) -> None:
"""Exercise the except-Exception branch (lines 248-250) in live."""
"""OSError in get_all_sessions produces a friendly error message."""

def _exploding_sessions(_base: Path | None = None) -> list[object]:
msg = "live explosion"
raise RuntimeError(msg)
raise OSError(msg)

monkeypatch.setattr("copilot_usage.cli.get_all_sessions", _exploding_sessions)
runner = CliRunner()
result = runner.invoke(main, ["live", "--path", str(tmp_path)])
assert result.exit_code != 0
assert "live explosion" in result.output
assert "Traceback" not in (result.output or "")


# ---------------------------------------------------------------------------
Expand Down
Loading