Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/copilot_usage/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,21 @@ def session(ctx: click.Context, session_id: str, path: Path | None) -> None:
click.echo("No sessions found.", err=True)
sys.exit(1)

# Parse all sessions and find the one matching by prefix
# Fast path: skip directories that clearly cannot match the prefix.
# Only apply the pre-filter on UUID-shaped directory names (36 chars
# with 4 dashes), where the directory name IS the session ID.
# Non-UUID dirs (e.g. test fixtures) always need a full parse.
available: list[str] = []
for events_path in event_paths:
dir_name = events_path.parent.name
is_uuid_dir = len(dir_name) == 36 and dir_name.count("-") == 4
if (
len(session_id) >= 4
and is_uuid_dir
and not dir_name.startswith(session_id)
):
available.append(dir_name[:8])
continue
events = parse_events(events_path)
if not events:
continue
Expand Down
146 changes: 145 additions & 1 deletion tests/copilot_usage/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ def _write_session(
premium: int = 3,
output_tokens: int = 1500,
active: bool = False,
use_full_uuid_dir: bool = False,
) -> Path:
"""Create a minimal events.jsonl file inside *base*/<dir>/."""
session_dir = base / session_id[:8]
session_dir = base / (session_id if use_full_uuid_dir else session_id[:8])
session_dir.mkdir(parents=True, exist_ok=True)

events: list[dict[str, Any]] = [
Expand Down Expand Up @@ -899,3 +900,146 @@ def _fake_read(timeout: float = 0.5) -> str | None: # noqa: ARG001
assert result.exit_code == 0
# _show_session_by_index called at least twice: initial '1' + auto-refresh
assert len(show_detail_calls) >= 2


# ---------------------------------------------------------------------------
# Issue #138 — fast pre-filter on directory name
# ---------------------------------------------------------------------------


def test_session_prefilter_skips_non_matching_dirs(
tmp_path: Path, monkeypatch: Any
) -> None:
"""parse_events is only called for the matching directory when prefix ≥ 4 chars.

Creates ≥ 5 UUID-named sessions and verifies the pre-filter skips parsing
directories whose names don't start with the requested prefix.
"""
uuids = [
"aaaaaaaa-1111-1111-1111-111111111111",
"bbbbbbbb-2222-2222-2222-222222222222",
"cccccccc-3333-3333-3333-333333333333",
"dddddddd-4444-4444-4444-444444444444",
"eeeeeeee-5555-5555-5555-555555555555",
]
target = uuids[2] # cccccccc-...

for uid in uuids:
_write_session(tmp_path, uid, use_full_uuid_dir=True)

def _fake_discover(_base: Path | None = None) -> list[Path]:
return sorted(
tmp_path.glob("*/events.jsonl"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)

monkeypatch.setattr("copilot_usage.cli.discover_sessions", _fake_discover)

parse_calls: list[Path] = []
original_parse = __import__(
"copilot_usage.parser", fromlist=["parse_events"]
).parse_events

def _tracking_parse(events_path: Path) -> list[Any]:
parse_calls.append(events_path)
return original_parse(events_path)

monkeypatch.setattr("copilot_usage.cli.parse_events", _tracking_parse)

runner = CliRunner()
result = runner.invoke(main, ["session", "cccccccc"])
assert result.exit_code == 0
assert "cccccccc" in result.output

# Only the matching directory should have been parsed
assert len(parse_calls) == 1
assert parse_calls[0].parent.name == target


def test_session_prefilter_short_prefix_parses_all(
tmp_path: Path, monkeypatch: Any
) -> None:
"""Short prefixes (< 4 chars) bypass the pre-filter and parse all sessions."""
uuids = [
"ab111111-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
"ab222222-bbbb-bbbb-bbbb-bbbbbbbbbbbb",
"cd333333-cccc-cccc-cccc-cccccccccccc",
]
for uid in uuids:
_write_session(tmp_path, uid, use_full_uuid_dir=True)

def _fake_discover(_base: Path | None = None) -> list[Path]:
# Sort reverse-alphabetically so cd333… (non-matching) is visited
# before ab… dirs, proving the pre-filter didn't skip it.
return sorted(
tmp_path.glob("*/events.jsonl"),
key=lambda p: p.parent.name,
reverse=True,
)

monkeypatch.setattr("copilot_usage.cli.discover_sessions", _fake_discover)

parse_calls: list[Path] = []
original_parse = __import__(
"copilot_usage.parser", fromlist=["parse_events"]
).parse_events

def _tracking_parse(events_path: Path) -> list[Any]:
parse_calls.append(events_path)
return original_parse(events_path)

monkeypatch.setattr("copilot_usage.cli.parse_events", _tracking_parse)

runner = CliRunner()
# "ab" is only 2 chars — pre-filter should NOT skip anything
result = runner.invoke(main, ["session", "ab"])
assert result.exit_code == 0

# The non-matching cd333… dir must have been parsed (no pre-filter applied).
assert len(parse_calls) >= 2
parsed_dirs = {p.parent.name for p in parse_calls}
assert "cd333333-cccc-cccc-cccc-cccccccccccc" in parsed_dirs


def test_session_prefilter_non_uuid_dirs_always_parsed(
tmp_path: Path, monkeypatch: Any
) -> None:
"""Non-UUID directory names are always parsed even with long prefix."""
session_dir = tmp_path / "corrupt-session"
session_dir.mkdir()
events: list[dict[str, Any]] = [
{
"type": "session.start",
"timestamp": "2025-01-15T10:00:00Z",
"data": {
"sessionId": "corrupt0-0000-0000-0000-000000000000",
"startTime": "2025-01-15T10:00:00Z",
"context": {"cwd": "/home/user"},
},
},
{
"type": "session.shutdown",
"timestamp": "2025-01-15T11:00:00Z",
"currentModel": "claude-sonnet-4",
"data": {
"shutdownType": "normal",
"totalPremiumRequests": 1,
"totalApiDurationMs": 100,
"modelMetrics": {},
},
},
]
with (session_dir / "events.jsonl").open("w") as fh:
for ev in events:
fh.write(json.dumps(ev) + "\n")

def _fake_discover(_base: Path | None = None) -> list[Path]:
return list(tmp_path.glob("*/events.jsonl"))

monkeypatch.setattr("copilot_usage.cli.discover_sessions", _fake_discover)

runner = CliRunner()
result = runner.invoke(main, ["session", "corrupt0"])
assert result.exit_code == 0
assert "corrupt0" in result.output
Loading