diff --git a/claude_code_log/renderer.py b/claude_code_log/renderer.py index 02e779e0..7a87fdee 100644 --- a/claude_code_log/renderer.py +++ b/claude_code_log/renderer.py @@ -2,7 +2,9 @@ """Render Claude transcript data to HTML format.""" import json +import os import re +import time from pathlib import Path from typing import List, Optional, Dict, Any, cast, TYPE_CHECKING @@ -13,12 +15,14 @@ import mistune from jinja2 import Environment, FileSystemLoader, select_autoescape from pygments import highlight # type: ignore[reportUnknownVariableType] -from pygments.lexers import get_lexer_for_filename, TextLexer # type: ignore[reportUnknownVariableType] +from pygments.lexers import TextLexer # type: ignore[reportUnknownVariableType] from pygments.formatters import HtmlFormatter # type: ignore[reportUnknownVariableType] from pygments.util import ClassNotFound # type: ignore[reportUnknownVariableType] from .models import ( TranscriptEntry, + AssistantTranscriptEntry, + SystemTranscriptEntry, SummaryTranscriptEntry, QueueOperationTranscriptEntry, ContentItem, @@ -38,6 +42,13 @@ should_use_as_session_starter, create_session_preview, ) +from .renderer_timings import ( + DEBUG_TIMING, + report_timing_statistics, + timing_stat, + set_timing_var, + log_timing, +) from .cache import get_library_version @@ -232,7 +243,9 @@ def block_code(code: str, info: Optional[str] = None) -> str: cssclass="highlight", wrapcode=True, ) - return str(highlight(code, lexer, formatter)) # type: ignore[reportUnknownArgumentType] + # Track Pygments timing if enabled + with timing_stat("_pygments_timings"): + return str(highlight(code, lexer, formatter)) # type: ignore[reportUnknownArgumentType] else: # No language hint, use default rendering return original_render(code, info) @@ -244,21 +257,23 @@ def block_code(code: str, info: Optional[str] = None) -> str: def render_markdown(text: str) -> str: """Convert markdown text to HTML using mistune with Pygments syntax highlighting.""" - # Configure mistune with GitHub-flavored markdown features - renderer = mistune.create_markdown( - plugins=[ - "strikethrough", - "footnotes", - "table", - "url", - "task_lists", - "def_list", - _create_pygments_plugin(), - ], - escape=False, # Don't escape HTML since we want to render markdown properly - hard_wrap=True, # Line break for newlines (checklists in Assistant messages) - ) - return str(renderer(text)) + # Track markdown rendering time if enabled + with timing_stat("_markdown_timings"): + # Configure mistune with GitHub-flavored markdown features + renderer = mistune.create_markdown( + plugins=[ + "strikethrough", + "footnotes", + "table", + "url", + "task_lists", + "def_list", + _create_pygments_plugin(), + ], + escape=False, # Don't escape HTML since we want to render markdown properly + hard_wrap=True, # Line break for newlines (checklists in Assistant messages) + ) + return str(renderer(text)) def extract_command_info(text_content: str) -> tuple[str, str, str]: @@ -372,9 +387,68 @@ def _highlight_code_with_pygments( Returns: HTML string with syntax-highlighted code """ + # PERFORMANCE FIX: Use Pygments' public API to build filename pattern mapping, avoiding filesystem I/O + # get_lexer_for_filename performs I/O operations (file existence checks, reading bytes) + # which causes severe slowdowns, especially on Windows with antivirus scanning + # Solution: Build a reverse mapping from filename patterns to lexer aliases using get_all_lexers() (done once) + import fnmatch + from pygments.lexers import get_lexer_by_name, get_all_lexers # type: ignore[reportUnknownVariableType] + + # Build pattern->alias mapping on first call (cached as function attribute) + # OPTIMIZATION: Create both direct extension lookup and full pattern cache + if not hasattr(_highlight_code_with_pygments, "_pattern_cache"): + pattern_cache: dict[str, str] = {} + extension_cache: dict[str, str] = {} # Fast lookup for simple *.ext patterns + + # Use public API: get_all_lexers() returns (name, aliases, patterns, mimetypes) tuples + for name, aliases, patterns, mimetypes in get_all_lexers(): # type: ignore[reportUnknownVariableType] + if aliases and patterns: + # Use first alias as the lexer name + lexer_alias = aliases[0] + # Map each filename pattern to this lexer alias + for pattern in patterns: + pattern_lower = pattern.lower() + pattern_cache[pattern_lower] = lexer_alias + # Extract simple extension patterns (*.ext) for fast lookup + if ( + pattern_lower.startswith("*.") + and "*" not in pattern_lower[2:] + and "?" not in pattern_lower[2:] + ): + ext = pattern_lower[2:] # Remove "*." + # Prefer first match for each extension + if ext not in extension_cache: + extension_cache[ext] = lexer_alias + + _highlight_code_with_pygments._pattern_cache = pattern_cache # type: ignore[attr-defined] + _highlight_code_with_pygments._extension_cache = extension_cache # type: ignore[attr-defined] + + # Get basename for matching (patterns are like "*.py") + basename = os.path.basename(file_path).lower() + try: - # Try to get lexer based on filename - lexer = get_lexer_for_filename(file_path, code) # type: ignore[reportUnknownVariableType] + # Get caches + pattern_cache = _highlight_code_with_pygments._pattern_cache # type: ignore[attr-defined] + extension_cache = _highlight_code_with_pygments._extension_cache # type: ignore[attr-defined] + + # OPTIMIZATION: Try fast extension lookup first (O(1) dict lookup) + lexer_alias = None + if "." in basename: + ext = basename.split(".")[-1] # Get last extension (handles .tar.gz, etc.) + lexer_alias = extension_cache.get(ext) + + # Fall back to pattern matching only if extension lookup failed + if lexer_alias is None: + for pattern, lex_alias in pattern_cache.items(): + if fnmatch.fnmatch(basename, pattern): + lexer_alias = lex_alias + break + + # Get lexer or use TextLexer as fallback + if lexer_alias: + lexer = get_lexer_by_name(lexer_alias, stripall=True) # type: ignore[reportUnknownVariableType] + else: + lexer = TextLexer() # type: ignore[reportUnknownVariableType] except ClassNotFound: # Fall back to plain text lexer lexer = TextLexer() # type: ignore[reportUnknownVariableType] @@ -387,8 +461,52 @@ def _highlight_code_with_pygments( linenostart=linenostart, ) - # Highlight the code - return str(highlight(code, lexer, formatter)) # type: ignore[reportUnknownArgumentType] + # Highlight the code with timing if enabled + with timing_stat("_pygments_timings"): + return str(highlight(code, lexer, formatter)) # type: ignore[reportUnknownArgumentType] + + +def _truncate_highlighted_preview(highlighted_html: str, max_lines: int) -> str: + """Truncate Pygments highlighted HTML to first N lines. + + HtmlFormatter(linenos="table") produces a single with two s: +
LINE_NUMS
+
CODE
+ + We truncate content within each
 tag to the first max_lines lines.
+
+    Args:
+        highlighted_html: Full Pygments-highlighted HTML
+        max_lines: Maximum number of lines to include in preview
+
+    Returns:
+        Truncated HTML with same structure but fewer lines
+    """
+
+    def truncate_pre_content(match: re.Match[str]) -> str:
+        """Truncate content inside a 
 tag to max_lines."""
+        prefix, content, suffix = match.groups()
+        lines = content.split("\n")
+        truncated = "\n".join(lines[:max_lines])
+        return prefix + truncated + suffix
+
+    # Truncate linenos 
 content (line numbers separated by newlines)
+    result = re.sub(
+        r'(
)(.*?)(
)', + truncate_pre_content, + highlighted_html, + flags=re.DOTALL, + ) + + # Truncate code
 content
+    result = re.sub(
+        r'(
]*>)(.*?)(
)', + truncate_pre_content, + result, + flags=re.DOTALL, + ) + + return result def format_read_tool_content(tool_use: ToolUseContent) -> str: # noqa: ARG001 @@ -977,10 +1095,11 @@ def format_tool_result_content( # Try to parse as Read tool result if file_path is provided if file_path and tool_name == "Read" and not has_images: parsed_result = _parse_read_tool_result(raw_content) + if parsed_result: code_content, system_reminder, line_offset = parsed_result - # Highlight code with Pygments using correct line offset + # Highlight code with Pygments using correct line offset (single call) highlighted_html = _highlight_code_with_pygments( code_content, file_path, linenostart=line_offset ) @@ -991,11 +1110,12 @@ def format_tool_result_content( # Make collapsible if content has more than 12 lines lines = code_content.split("\n") if len(lines) > 12: - # Get preview (first ~5 lines) - preview_lines = lines[:5] - preview_html = _highlight_code_with_pygments( - "\n".join(preview_lines), file_path, linenostart=line_offset - ) + # Extract preview from already-highlighted HTML to avoid double-highlighting + # HtmlFormatter(linenos="table") produces a single with two s: + # ...
LINE_NUMS
... + # ...
CODE
... + # We truncate content within each
 to first 5 lines
+                preview_html = _truncate_highlighted_preview(highlighted_html, 5)
 
                 result_parts.append(f"""
                 
@@ -1384,46 +1504,27 @@ def render_message_content(content: List[ContentItem], message_type: str) -> str elif type(item) is ToolUseContent or ( hasattr(item, "type") and item_type == "tool_use" ): - # Handle both ToolUseContent and Anthropic ToolUseBlock - # Convert Anthropic type to our format if necessary - if not isinstance(item, ToolUseContent): - # Create a ToolUseContent from Anthropic ToolUseBlock - tool_use_item = ToolUseContent( - type="tool_use", - id=getattr(item, "id", ""), - name=getattr(item, "name", ""), - input=getattr(item, "input", {}), - ) - else: - tool_use_item = item - rendered_parts.append(format_tool_use_content(tool_use_item)) # type: ignore + # Tool use items should not appear here - they are filtered out before this function + print( + "Warning: tool_use content should not be processed in render_message_content", + flush=True, + ) elif type(item) is ToolResultContent or ( hasattr(item, "type") and item_type == "tool_result" ): - # Handle both ToolResultContent and Anthropic types - if not isinstance(item, ToolResultContent): - # Convert from Anthropic type if needed - tool_result_item = ToolResultContent( - type="tool_result", - tool_use_id=getattr(item, "tool_use_id", ""), - content=getattr(item, "content", ""), - is_error=getattr(item, "is_error", False), - ) - else: - tool_result_item = item - rendered_parts.append(format_tool_result_content(tool_result_item)) # type: ignore + # Tool result items should not appear here - they are filtered out before this function + print( + "Warning: tool_result content should not be processed in render_message_content", + flush=True, + ) elif type(item) is ThinkingContent or ( hasattr(item, "type") and item_type == "thinking" ): - # Handle both ThinkingContent and Anthropic ThinkingBlock - if not isinstance(item, ThinkingContent): - # Convert from Anthropic type if needed - thinking_item = ThinkingContent( - type="thinking", thinking=getattr(item, "thinking", str(item)) - ) - else: - thinking_item = item - rendered_parts.append(format_thinking_content(thinking_item)) # type: ignore + # Thinking items should not appear here - they are filtered out before this function + print( + "Warning: thinking content should not be processed in render_message_content", + flush=True, + ) elif type(item) is ImageContent: rendered_parts.append(format_image_content(item)) # type: ignore @@ -2517,77 +2618,202 @@ def generate_html( combined_transcript_link: Optional[str] = None, ) -> str: """Generate HTML from transcript messages using Jinja2 templates.""" - if not title: - title = "Claude Transcript" + # Performance timing + t_start = time.time() + + with log_timing("Initialization", t_start): + if not title: + title = "Claude Transcript" # Deduplicate messages by (message_type, timestamp) # Messages with the exact same timestamp are duplicates by definition - # the differences (like IDE selection tags) are just logging artifacts - from claude_code_log.models import AssistantTranscriptEntry, SystemTranscriptEntry + with log_timing( + lambda: f"Deduplication ({len(deduplicated_messages)} messages)", t_start + ): + # Track seen (message_type, timestamp) pairs + seen: set[tuple[str, str]] = set() + deduplicated_messages: List[TranscriptEntry] = [] - # Track seen (message_type, timestamp) pairs - seen: set[tuple[str, str]] = set() - deduplicated_messages: List[TranscriptEntry] = [] + for message in messages: + # Get basic message type + message_type = getattr(message, "type", "unknown") - for message in messages: - # Get basic message type - message_type = getattr(message, "type", "unknown") - - # For system messages, include level to differentiate info/warning/error - if isinstance(message, SystemTranscriptEntry): - level = getattr(message, "level", "info") - message_type = f"system-{level}" + # For system messages, include level to differentiate info/warning/error + if isinstance(message, SystemTranscriptEntry): + level = getattr(message, "level", "info") + message_type = f"system-{level}" - # Get timestamp - timestamp = getattr(message, "timestamp", "") + # Get timestamp + timestamp = getattr(message, "timestamp", "") - # Create deduplication key - dedup_key = (message_type, timestamp) + # Create deduplication key + dedup_key = (message_type, timestamp) - # Keep only first occurrence - if dedup_key not in seen: - seen.add(dedup_key) - deduplicated_messages.append(message) + # Keep only first occurrence + if dedup_key not in seen: + seen.add(dedup_key) + deduplicated_messages.append(message) - messages = deduplicated_messages + messages = deduplicated_messages # Pre-process to find and attach session summaries - session_summaries: Dict[str, str] = {} - uuid_to_session: Dict[str, str] = {} - uuid_to_session_backup: Dict[str, str] = {} + with log_timing("Session summary processing", t_start): + session_summaries: Dict[str, str] = {} + uuid_to_session: Dict[str, str] = {} + uuid_to_session_backup: Dict[str, str] = {} + + # Build mapping from message UUID to session ID + for message in messages: + if hasattr(message, "uuid") and hasattr(message, "sessionId"): + message_uuid = getattr(message, "uuid", "") + session_id = getattr(message, "sessionId", "") + if message_uuid and session_id: + # There is often duplication, in that case we want to prioritise the assistant + # message because summaries are generated from Claude's (last) success message + if type(message) is AssistantTranscriptEntry: + uuid_to_session[message_uuid] = session_id + else: + uuid_to_session_backup[message_uuid] = session_id + + # Map summaries to sessions via leafUuid -> message UUID -> session ID + for message in messages: + if isinstance(message, SummaryTranscriptEntry): + leaf_uuid = message.leafUuid + if leaf_uuid in uuid_to_session: + session_summaries[uuid_to_session[leaf_uuid]] = message.summary + elif ( + leaf_uuid in uuid_to_session_backup + and uuid_to_session_backup[leaf_uuid] not in session_summaries + ): + session_summaries[uuid_to_session_backup[leaf_uuid]] = ( + message.summary + ) - # Build mapping from message UUID to session ID - for message in messages: - if hasattr(message, "uuid") and hasattr(message, "sessionId"): - message_uuid = getattr(message, "uuid", "") - session_id = getattr(message, "sessionId", "") - if message_uuid and session_id: - # There is often duplication, in that case we want to prioritise the assistant - # message because summaries are generated from Claude's (last) success message - if type(message) is AssistantTranscriptEntry: - uuid_to_session[message_uuid] = session_id + # Attach summaries to messages + for message in messages: + if hasattr(message, "sessionId"): + session_id = getattr(message, "sessionId", "") + if session_id in session_summaries: + setattr(message, "_session_summary", session_summaries[session_id]) + + # Process messages through the main rendering loop + template_messages, sessions, session_order = _process_messages_loop(messages) + + # Prepare session navigation data + session_nav: List[Dict[str, Any]] = [] + with log_timing( + lambda: f"Session navigation building ({len(session_nav)} sessions)", t_start + ): + for session_id in session_order: + session_info = sessions[session_id] + + # Format timestamp range + first_ts = session_info["first_timestamp"] + last_ts = session_info["last_timestamp"] + timestamp_range = "" + if first_ts and last_ts: + if first_ts == last_ts: + timestamp_range = format_timestamp(first_ts) else: - uuid_to_session_backup[message_uuid] = session_id + timestamp_range = ( + f"{format_timestamp(first_ts)} - {format_timestamp(last_ts)}" + ) + elif first_ts: + timestamp_range = format_timestamp(first_ts) - # Map summaries to sessions via leafUuid -> message UUID -> session ID - for message in messages: - if isinstance(message, SummaryTranscriptEntry): - leaf_uuid = message.leafUuid - if leaf_uuid in uuid_to_session: - session_summaries[uuid_to_session[leaf_uuid]] = message.summary - elif ( - leaf_uuid in uuid_to_session_backup - and uuid_to_session_backup[leaf_uuid] not in session_summaries - ): - session_summaries[uuid_to_session_backup[leaf_uuid]] = message.summary + # Format token usage summary + token_summary = "" + total_input = session_info["total_input_tokens"] + total_output = session_info["total_output_tokens"] + total_cache_creation = session_info["total_cache_creation_tokens"] + total_cache_read = session_info["total_cache_read_tokens"] + + if total_input > 0 or total_output > 0: + token_parts: List[str] = [] + if total_input > 0: + token_parts.append(f"Input: {total_input}") + if total_output > 0: + token_parts.append(f"Output: {total_output}") + if total_cache_creation > 0: + token_parts.append(f"Cache Creation: {total_cache_creation}") + if total_cache_read > 0: + token_parts.append(f"Cache Read: {total_cache_read}") + token_summary = "Token usage – " + " | ".join(token_parts) + + session_nav.append( + { + "id": session_id, + "summary": session_info["summary"], + "timestamp_range": timestamp_range, + "first_timestamp": first_ts, + "last_timestamp": last_ts, + "message_count": session_info["message_count"], + "first_user_message": session_info["first_user_message"] + if session_info["first_user_message"] != "" + else "[No user message found in session.]", + "token_summary": token_summary, + } + ) - # Attach summaries to messages - for message in messages: - if hasattr(message, "sessionId"): - session_id = getattr(message, "sessionId", "") - if session_id in session_summaries: - setattr(message, "_session_summary", session_summaries[session_id]) + # Identify and mark paired messages (command+output, tool_use+tool_result, etc.) + with log_timing("Identify message pairs", t_start): + _identify_message_pairs(template_messages) + + # Reorder messages so pairs are adjacent while preserving chronological order + with log_timing("Reorder paired messages", t_start): + template_messages = _reorder_paired_messages(template_messages) + + # Mark messages that have children for fold/unfold controls + with log_timing("Mark messages with children", t_start): + _mark_messages_with_children(template_messages) + + # Render template + with log_timing("Template environment setup", t_start): + env = _get_template_environment() + template = env.get_template("transcript.html") + + with log_timing(lambda: f"Template rendering ({len(html_output)} chars)", t_start): + html_output = str( + template.render( + title=title, + messages=template_messages, + sessions=session_nav, + combined_transcript_link=combined_transcript_link, + library_version=get_library_version(), + ) + ) + return html_output + + +def _process_messages_loop( + messages: List[TranscriptEntry], +) -> tuple[ + List[TemplateMessage], + Dict[str, Dict[str, Any]], # sessions + List[str], # session_order +]: + """Process messages through the main rendering loop. + + This function handles the core message processing logic: + - Processes each message into template-friendly format + - Tracks sessions and token usage + - Handles message deduplication and hierarchy + - Collects timing statistics + + Note: Tool use context must be built before calling this function via + _define_tool_use_context() + + Args: + messages: List of transcript entries to process + + Returns: + Tuple containing: + - template_messages: Processed messages ready for template rendering + - sessions: Session metadata dict mapping session_id to info + - session_order: List of session IDs in chronological order + """ # Group messages by session and collect session info for navigation sessions: Dict[str, Dict[str, Any]] = {} session_order: List[str] = [] @@ -2598,34 +2824,9 @@ def generate_html( # Track which messages should show token usage (first occurrence of each requestId) show_tokens_for_message: set[str] = set() - # Build mapping of tool_use_id to tool info for specialized tool result rendering - tool_use_context: Dict[str, Dict[str, Any]] = {} - for message in messages: - if hasattr(message, "message") and hasattr(message.message, "content"): # type: ignore - content = message.message.content # type: ignore - if isinstance(content, list): - for item in content: # type: ignore[reportUnknownVariableType] - # Check if it's a tool_use item - if hasattr(item, "type") and hasattr(item, "id"): # type: ignore[reportUnknownArgumentType] - item_type = getattr(item, "type", None) # type: ignore[reportUnknownArgumentType] - if item_type == "tool_use": - tool_id = getattr(item, "id", "") # type: ignore[reportUnknownArgumentType] - tool_name = getattr(item, "name", "") # type: ignore[reportUnknownArgumentType] - tool_input = getattr(item, "input", {}) # type: ignore[reportUnknownArgumentType] - if tool_id: - tool_ctx: Dict[str, Any] = { - "name": tool_name, - "input": tool_input, - } - # For Task tools, store the prompt for comparison - if tool_name == "Task" and isinstance(tool_input, dict): - prompt_value = tool_input.get("prompt", "") # type: ignore[reportUnknownVariableType, reportUnknownMemberType] - tool_ctx["prompt"] = ( - prompt_value - if isinstance(prompt_value, str) - else "" - ) - tool_use_context[tool_id] = tool_ctx + # Build mapping of tool_use_id to ToolUseContent for specialized tool result rendering + # This will be populated inline as we encounter tool_use items during message processing + tool_use_context: Dict[str, ToolUseContent] = {} # Process messages into template-friendly format template_messages: List[TemplateMessage] = [] @@ -2642,8 +2843,27 @@ def generate_html( # Maps raw content -> (template_messages index, message_id, type: "task" or "assistant") content_map: Dict[str, tuple[int, str, str]] = {} - for message in messages: + # Per-message timing tracking + message_timings: List[ + tuple[float, str, int, str] + ] = [] # (duration, message_type, index, uuid) + + # Track expensive operations + markdown_timings: List[tuple[float, str]] = [] # (duration, context_uuid) + pygments_timings: List[tuple[float, str]] = [] # (duration, context_uuid) + + # Initialize timing tracking + set_timing_var("_markdown_timings", markdown_timings) + set_timing_var("_pygments_timings", pygments_timings) + set_timing_var("_current_msg_uuid", "") + + for msg_idx, message in enumerate(messages): + msg_start_time = time.time() if DEBUG_TIMING else 0.0 message_type = message.type + msg_uuid = getattr(message, "uuid", f"no-uuid-{msg_idx}") + + # Update current message UUID for timing tracking + set_timing_var("_current_msg_uuid", msg_uuid) # Skip summary messages - they should already be attached to their sessions if isinstance(message, SummaryTranscriptEntry): @@ -3035,32 +3255,35 @@ def generate_html( if isinstance(tool_item, ToolUseContent) or item_type == "tool_use": # Convert Anthropic type to our format if necessary if not isinstance(tool_item, ToolUseContent): - tool_use_converted = ToolUseContent( + tool_use = ToolUseContent( type="tool_use", id=getattr(tool_item, "id", ""), name=getattr(tool_item, "name", ""), input=getattr(tool_item, "input", {}), ) else: - tool_use_converted = tool_item + tool_use = tool_item - tool_content_html = format_tool_use_content(tool_use_converted) - escaped_name = escape_html(tool_use_converted.name) - escaped_id = escape_html(tool_use_converted.id) - item_tool_use_id = tool_use_converted.id + tool_content_html = format_tool_use_content(tool_use) + escaped_name = escape_html(tool_use.name) + escaped_id = escape_html(tool_use.id) + item_tool_use_id = tool_use.id tool_title_hint = f"ID: {escaped_id}" + # Populate tool_use_context for later use when processing tool results + tool_use_context[item_tool_use_id] = tool_use + # Get summary for header (description or filepath) - summary = get_tool_summary(tool_use_converted) + summary = get_tool_summary(tool_use) # Set message_type (for CSS/logic) and message_title (for display) tool_message_type = "tool_use" - if tool_use_converted.name == "TodoWrite": + if tool_use.name == "TodoWrite": tool_message_title = "πŸ“ Todo List" - elif tool_use_converted.name == "Task": + elif tool_use.name == "Task": # Special handling for Task tool: show subagent_type and description - subagent_type = tool_use_converted.input.get("subagent_type", "") - description = tool_use_converted.input.get("description", "") + subagent_type = tool_use.input.get("subagent_type", "") + description = tool_use.input.get("description", "") escaped_subagent = ( escape_html(subagent_type) if subagent_type else "" ) @@ -3075,14 +3298,14 @@ def generate_html( tool_message_title = f"πŸ”§ {escaped_name} ({escaped_subagent})" else: tool_message_title = f"πŸ”§ {escaped_name}" - elif tool_use_converted.name in ("Edit", "Write"): + elif tool_use.name in ("Edit", "Write"): # Use πŸ“ icon for Edit/Write if summary: escaped_summary = escape_html(summary) tool_message_title = f"πŸ“ {escaped_name} {escaped_summary}" else: tool_message_title = f"πŸ“ {escaped_name}" - elif tool_use_converted.name == "Read": + elif tool_use.name == "Read": # Use πŸ“„ icon for Read if summary: escaped_summary = escape_html(summary) @@ -3112,14 +3335,20 @@ def generate_html( result_file_path: Optional[str] = None result_tool_name: Optional[str] = None if tool_result_converted.tool_use_id in tool_use_context: - tool_ctx = tool_use_context[tool_result_converted.tool_use_id] - result_tool_name = tool_ctx.get("name") - if result_tool_name in ( - "Read", - "Edit", - "Write", - ) and "file_path" in tool_ctx.get("input", {}): - result_file_path = tool_ctx["input"]["file_path"] + tool_use_from_ctx = tool_use_context[ + tool_result_converted.tool_use_id + ] + result_tool_name = tool_use_from_ctx.name + if ( + result_tool_name + in ( + "Read", + "Edit", + "Write", + ) + and "file_path" in tool_use_from_ctx.input + ): + result_file_path = tool_use_from_ctx.input["file_path"] tool_content_html = format_tool_result_content( tool_result_converted, @@ -3249,79 +3478,22 @@ def generate_html( pending_dedup = None # Reset for next iteration - # Prepare session navigation data - session_nav: List[Dict[str, Any]] = [] - for session_id in session_order: - session_info = sessions[session_id] - - # Format timestamp range - first_ts = session_info["first_timestamp"] - last_ts = session_info["last_timestamp"] - timestamp_range = "" - if first_ts and last_ts: - if first_ts == last_ts: - timestamp_range = format_timestamp(first_ts) - else: - timestamp_range = ( - f"{format_timestamp(first_ts)} - {format_timestamp(last_ts)}" - ) - elif first_ts: - timestamp_range = format_timestamp(first_ts) - - # Format token usage summary - token_summary = "" - total_input = session_info["total_input_tokens"] - total_output = session_info["total_output_tokens"] - total_cache_creation = session_info["total_cache_creation_tokens"] - total_cache_read = session_info["total_cache_read_tokens"] + # Track message timing + if DEBUG_TIMING: + msg_duration = time.time() - msg_start_time + message_timings.append((msg_duration, message_type, msg_idx, msg_uuid)) - if total_input > 0 or total_output > 0: - token_parts: List[str] = [] - if total_input > 0: - token_parts.append(f"Input: {total_input}") - if total_output > 0: - token_parts.append(f"Output: {total_output}") - if total_cache_creation > 0: - token_parts.append(f"Cache Creation: {total_cache_creation}") - if total_cache_read > 0: - token_parts.append(f"Cache Read: {total_cache_read}") - token_summary = "Token usage – " + " | ".join(token_parts) - - session_nav.append( - { - "id": session_id, - "summary": session_info["summary"], - "timestamp_range": timestamp_range, - "first_timestamp": first_ts, - "last_timestamp": last_ts, - "message_count": session_info["message_count"], - "first_user_message": session_info["first_user_message"] - if session_info["first_user_message"] != "" - else "[No user message found in session.]", - "token_summary": token_summary, - } + # Report loop statistics + if DEBUG_TIMING: + report_timing_statistics( + message_timings, + [("Markdown", markdown_timings), ("Pygments", pygments_timings)], ) - # Identify and mark paired messages (command+output, tool_use+tool_result, etc.) - _identify_message_pairs(template_messages) - - # Reorder messages so pairs are adjacent while preserving chronological order - template_messages = _reorder_paired_messages(template_messages) - - # Mark messages that have children for fold/unfold controls - _mark_messages_with_children(template_messages) - - # Render template - env = _get_template_environment() - template = env.get_template("transcript.html") - return str( - template.render( - title=title, - messages=template_messages, - sessions=session_nav, - combined_transcript_link=combined_transcript_link, - library_version=get_library_version(), - ) + return ( + template_messages, + sessions, + session_order, ) diff --git a/claude_code_log/renderer_timings.py b/claude_code_log/renderer_timings.py new file mode 100644 index 00000000..fb5111cb --- /dev/null +++ b/claude_code_log/renderer_timings.py @@ -0,0 +1,158 @@ +"""Timing utilities for renderer performance profiling. + +This module provides timing and performance profiling utilities for the renderer. +All timing-related configuration and functionality is centralized here. +""" + +import os +import time +from contextlib import contextmanager +from typing import List, Tuple, Iterator, Any, Dict, Callable, Union, Optional + +# Performance debugging - enabled via CLAUDE_CODE_LOG_DEBUG_TIMING environment variable +# Set to "1", "true", or "yes" to enable timing output +DEBUG_TIMING = os.getenv("CLAUDE_CODE_LOG_DEBUG_TIMING", "").lower() in ( + "1", + "true", + "yes", +) + +# Global timing data storage +_timing_data: Dict[str, Any] = {} + + +def set_timing_var(name: str, value: Any) -> None: + """Set a timing variable in the global timing data dict. + + Args: + name: Variable name (e.g., "_markdown_timings", "_pygments_timings", "_current_msg_uuid") + value: Value to set + """ + if DEBUG_TIMING: + _timing_data[name] = value + + +@contextmanager +def log_timing( + phase: Union[str, Callable[[], str]], + t_start: Optional[float] = None, +) -> Iterator[None]: + """Context manager for logging phase timing. + + Args: + phase: Phase name (static string) or callable returning phase name (for dynamic names) + t_start: Optional start time for calculating total elapsed time + + Example: + # Static phase name + with log_timing("Initialization", t_start): + setup_code() + + # Dynamic phase name (evaluated at end) + with log_timing(lambda: f"Processing ({len(items)} items)", t_start): + items = process() + """ + if not DEBUG_TIMING: + yield + return + + t_phase_start = time.time() + + try: + yield + finally: + t_now = time.time() + phase_time = t_now - t_phase_start + + # Evaluate phase name (call if callable, use directly if string) + phase_name = phase() if callable(phase) else phase + + # Calculate total time if t_start provided + if t_start is not None: + total_time = t_now - t_start + print( + f"[TIMING] {phase_name:40s} {phase_time:8.3f}s (total: {total_time:8.3f}s)", + flush=True, + ) + else: + print( + f"[TIMING] {phase_name:40s} {phase_time:8.3f}s", + flush=True, + ) + + # Update last timing checkpoint + _timing_data["_t_last"] = t_now + + +@contextmanager +def timing_stat(list_name: str) -> Iterator[None]: + """Context manager for tracking timing statistics. + + Args: + list_name: Name of the timing list to append to + (e.g., "_markdown_timings", "_pygments_timings") + + Example: + with timing_stat("_pygments_timings"): + result = expensive_operation() + """ + if not DEBUG_TIMING: + yield + return + + t_start = time.time() + try: + yield + finally: + duration = time.time() - t_start + if list_name in _timing_data: + msg_uuid = _timing_data.get("_current_msg_uuid", "") + _timing_data[list_name].append((duration, msg_uuid)) + + +def report_timing_statistics( + message_timings: List[Tuple[float, str, int, str]], + operation_timings: List[Tuple[str, List[Tuple[float, str]]]], +) -> None: + """Report timing statistics for message rendering. + + Args: + message_timings: List of (duration, message_type, index, uuid) tuples + operation_timings: List of (name, timings) tuples where timings is a list of (duration, uuid) + e.g., [("Markdown", markdown_timings), ("Pygments", pygments_timings)] + """ + if not message_timings: + return + + # Sort by duration descending + sorted_timings = sorted(message_timings, key=lambda x: x[0], reverse=True) + + # Calculate statistics + total_msg_time = sum(t[0] for t in message_timings) + avg_time = total_msg_time / len(message_timings) + + # Report slowest messages + print("\n[TIMING] Loop statistics:", flush=True) + print(f"[TIMING] Total messages: {len(message_timings)}", flush=True) + print(f"[TIMING] Average time per message: {avg_time * 1000:.1f}ms", flush=True) + print("[TIMING] Slowest 10 messages:", flush=True) + for duration, msg_type, idx, uuid in sorted_timings[:10]: + print( + f"[TIMING] Message {uuid} (#{idx}, {msg_type}): {duration * 1000:.1f}ms", + flush=True, + ) + + # Report operation-specific statistics + for operation_name, timings in operation_timings: + if timings: + sorted_ops = sorted(timings, key=lambda x: x[0], reverse=True) + total_time = sum(t[0] for t in timings) + print(f"\n[TIMING] {operation_name} rendering:", flush=True) + print(f"[TIMING] Total operations: {len(timings)}", flush=True) + print(f"[TIMING] Total time: {total_time:.3f}s", flush=True) + print("[TIMING] Slowest 10 operations:", flush=True) + for duration, uuid in sorted_ops[:10]: + print( + f"[TIMING] {uuid}: {duration * 1000:.1f}ms", + flush=True, + ) diff --git a/dev-docs/FOLD_STATE_DIAGRAM.md b/dev-docs/FOLD_STATE_DIAGRAM.md index 31556320..fd3df528 100644 --- a/dev-docs/FOLD_STATE_DIAGRAM.md +++ b/dev-docs/FOLD_STATE_DIAGRAM.md @@ -43,52 +43,26 @@ The fold bar has two buttons with three possible states: ## State Transitions ``` - β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” - β”‚ State A (β–Ά β–Άβ–Ά) β”‚ - β”‚ Nothing visible β”‚ - β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ - β”‚ β”‚ - Click β–Ά β”‚ β”‚ Click β–Άβ–Ά - (unfold 1) β”‚ β”‚ (unfold all) - β–Ό β–Ό - β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” - β”‚ State B β”‚ β”‚ State B β”‚ - β”‚ (β–Ό β–Άβ–Ά) │◄─── (β–Ό β–Άβ–Ά) β”‚ - β”‚ First β”‚ β”‚ First β”‚ - β”‚ level β”‚ β”‚ level β”‚ - β”‚ visible β”‚ β”‚ visible β”‚ - β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ - β”‚ β”‚ - Click β–Άβ–Ά β”‚ β”‚ Click β–Ό - (unfold all) β”‚ β”‚ (fold 1) - β–Ό β–Ό - β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” - β”‚ State C β”‚ β”‚ State A β”‚ - β”‚ (β–Ό β–Όβ–Ό) β”‚ β”‚ (β–Ά β–Άβ–Ά) β”‚ - β”‚ All levels β”‚ β”‚ Nothing β”‚ - β”‚ visible β”‚ β”‚ visible β”‚ - β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ - β”‚ β”‚ - Click β–Όβ–Ό β”‚ β”‚ Click β–Άβ–Ά - (fold all) β”‚ β”‚ (unfold all) - β–Ό β–Ό - β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” - β”‚ State B β”‚ β”‚ State C β”‚ - β”‚ (β–Ό β–Άβ–Ά) β”‚ β”‚ (β–Ό β–Όβ–Ό) β”‚ - β”‚ First β”‚ β”‚ All levels β”‚ - β”‚ level β”‚ β”‚ visible β”‚ - β”‚ visible β”‚ β”‚ β”‚ - β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ - β”‚ - Click β–Ό β”‚ - (fold 1) β”‚ - β–Ό - β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” - β”‚ State A β”‚ - β”‚ (β–Ά β–Άβ–Ά) β”‚ - β”‚ Nothing β”‚ - β”‚ visible β”‚ - β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β–Ίβ”‚ State A (β–Ά / β–Άβ–Ά) │◄────────┐ + β”‚ β”‚ Nothing visible β”‚ β”‚ + β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ + β”‚ β”‚ β”‚ β”‚ + β”‚ Click β–Ά β”‚ β”‚ Click β–Άβ–Ά β”‚ + β”‚ (unfold 1) β”‚ β”‚ (unfold all) β”‚ + β”‚ β–Ό β–Ό β”‚ + β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ + β”‚ β”‚ State B β”‚ β”‚ State C β”‚ β”‚ + β”‚ β”‚ (β–Ό / β–Άβ–Ά) β”‚ β”‚ (β–Ό / β–Όβ–Ό) β”‚ β”‚ + β”‚ β”‚ First β”‚ β”‚ All β”‚ β”‚ + β”‚ β”‚ level β”‚ β”‚ levels β”‚ β”‚ + β”‚ β”‚ visible β”‚ β”‚ visible β”‚ β”‚ + β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ + β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ + β”‚ Click β–Όβ”‚ └── β–Άβ–Ά ↔ β–Όβ–Ό β”€β”€β”˜ β”‚Click β–Ό β”‚ + β”‚ β”‚ (unfold all / fold 1) β”‚ β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + (fold all) (fold all) ``` ## Simplified Transition Table diff --git a/test/test_data/edit_tool.jsonl b/test/test_data/edit_tool.jsonl new file mode 100644 index 00000000..896e9d44 --- /dev/null +++ b/test/test_data/edit_tool.jsonl @@ -0,0 +1,2 @@ +{"parentUuid":"49306c7f-62f4-41ce-b0e7-b67a891148b7","isSidechain":false,"userType":"external","cwd":"e:\\Workspace\\src\\github\\claude-code-log","sessionId":"88a8d761-7b9a-4bf1-a8ca-c1febe6bf358","version":"2.0.46","gitBranch":"dev/fold-ui-horizontal","message":{"model":"claude-sonnet-4-5-20250929","id":"msg_01A76saVFjpeVZQmepohx5gy","type":"message","role":"assistant","content":[{"type":"tool_use","id":"toolu_01PhNkTBr17qvahqhCdkUkvM","name":"Edit","input":{"file_path":"e:\\Workspace\\src\\github\\claude-code-log\\claude_code_log\\renderer_timings.py","old_string":"def set_timing_var(name: str, value: Any) -> None:\n \"\"\"Set a timing variable in the global timing data dict.\n\n Args:\n name: Variable name (e.g., \"_markdown_timings\", \"_pygments_timings\")\n value: Value to set\n \"\"\"\n if DEBUG_TIMING:\n _timing_data[name] = value\n\n\ndef set_current_msg_uuid(uuid: str) -> None:\n \"\"\"Set the current message UUID for timing tracking.\n\n Args:\n uuid: Message UUID\n \"\"\"\n if DEBUG_TIMING:\n _timing_data[\"_current_msg_uuid\"] = uuid","new_string":"def set_timing_var(name: str, value: Any) -> None:\n \"\"\"Set a timing variable in the global timing data dict.\n\n Args:\n name: Variable name (e.g., \"_markdown_timings\", \"_pygments_timings\", \"_current_msg_uuid\")\n value: Value to set\n \"\"\"\n if DEBUG_TIMING:\n _timing_data[name] = value"}}],"stop_reason":"tool_use","stop_sequence":null,"usage":{"input_tokens":0,"cache_creation_input_tokens":10391,"cache_read_input_tokens":102802,"cache_creation":{"ephemeral_5m_input_tokens":10391,"ephemeral_1h_input_tokens":0},"output_tokens":526,"service_tier":"standard"}},"requestId":"req_011CVRRU3KTqNrUhZ9K35Zd2","type":"assistant","uuid":"e450d9e3-ccb9-4595-a7cb-e13f13bab5a0","timestamp":"2025-11-23T20:06:04.851Z"} +{"parentUuid":"e450d9e3-ccb9-4595-a7cb-e13f13bab5a0","isSidechain":false,"userType":"external","cwd":"e:\\Workspace\\src\\github\\claude-code-log","sessionId":"88a8d761-7b9a-4bf1-a8ca-c1febe6bf358","version":"2.0.46","gitBranch":"dev/fold-ui-horizontal","type":"user","message":{"role":"user","content":[{"tool_use_id":"toolu_01PhNkTBr17qvahqhCdkUkvM","type":"tool_result","content":"The file e:\\Workspace\\src\\github\\claude-code-log\\claude_code_log\\renderer_timings.py has been updated. Here's the result of running `cat -n` on a snippet of the edited file:\n 15β†’# Global timing data storage\n 16β†’_timing_data: Dict[str, Any] = {}\n 17β†’\n 18β†’\n 19β†’def set_timing_var(name: str, value: Any) -> None:\n 20β†’ \"\"\"Set a timing variable in the global timing data dict.\n 21β†’\n 22β†’ Args:\n 23β†’ name: Variable name (e.g., \"_markdown_timings\", \"_pygments_timings\", \"_current_msg_uuid\")\n 24β†’ value: Value to set\n 25β†’ \"\"\"\n 26β†’ if DEBUG_TIMING:\n 27β†’ _timing_data[name] = value\n 28β†’\n 29β†’\n 30β†’@contextmanager\n 31β†’def timing_stat(list_name: str) -> Iterator[None]:"}]},"uuid":"46b5acca-1860-4ae3-be78-ed3537a81d8b","timestamp":"2025-11-23T20:06:14.160Z","toolUseResult":{"filePath":"e:\\Workspace\\src\\github\\claude-code-log\\claude_code_log\\renderer_timings.py","oldString":"def set_timing_var(name: str, value: Any) -> None:\n \"\"\"Set a timing variable in the global timing data dict.\n\n Args:\n name: Variable name (e.g., \"_markdown_timings\", \"_pygments_timings\")\n value: Value to set\n \"\"\"\n if DEBUG_TIMING:\n _timing_data[name] = value\n\n\ndef set_current_msg_uuid(uuid: str) -> None:\n \"\"\"Set the current message UUID for timing tracking.\n\n Args:\n uuid: Message UUID\n \"\"\"\n if DEBUG_TIMING:\n _timing_data[\"_current_msg_uuid\"] = uuid","newString":"def set_timing_var(name: str, value: Any) -> None:\n \"\"\"Set a timing variable in the global timing data dict.\n\n Args:\n name: Variable name (e.g., \"_markdown_timings\", \"_pygments_timings\", \"_current_msg_uuid\")\n value: Value to set\n \"\"\"\n if DEBUG_TIMING:\n _timing_data[name] = value","originalFile":"\"\"\"Timing utilities for renderer performance profiling.\"\"\"\n\nimport os\nimport time\nfrom contextlib import contextmanager\nfrom typing import List, Tuple, Iterator, Any, Dict\n\n# Performance debugging\nDEBUG_TIMING = os.getenv(\"CLAUDE_CODE_LOG_DEBUG_TIMING\", \"\").lower() in (\n \"1\",\n \"true\",\n \"yes\",\n)\n\n# Global timing data storage\n_timing_data: Dict[str, Any] = {}\n\n\ndef set_timing_var(name: str, value: Any) -> None:\n \"\"\"Set a timing variable in the global timing data dict.\n\n Args:\n name: Variable name (e.g., \"_markdown_timings\", \"_pygments_timings\")\n value: Value to set\n \"\"\"\n if DEBUG_TIMING:\n _timing_data[name] = value\n\n\ndef set_current_msg_uuid(uuid: str) -> None:\n \"\"\"Set the current message UUID for timing tracking.\n\n Args:\n uuid: Message UUID\n \"\"\"\n if DEBUG_TIMING:\n _timing_data[\"_current_msg_uuid\"] = uuid\n\n\n@contextmanager\ndef timing_stat(list_name: str) -> Iterator[None]:\n \"\"\"Context manager for tracking timing statistics.\n\n Args:\n list_name: Name of the timing list to append to\n (e.g., \"_markdown_timings\", \"_pygments_timings\")\n\n Example:\n with timing_stat(\"_pygments_timings\"):\n result = expensive_operation()\n \"\"\"\n if not DEBUG_TIMING:\n yield\n return\n\n t_start = time.time()\n try:\n yield\n finally:\n duration = time.time() - t_start\n if list_name in _timing_data:\n msg_uuid = _timing_data.get(\"_current_msg_uuid\", \"\")\n _timing_data[list_name].append((duration, msg_uuid))\n\n\ndef report_timing_statistics(\n message_timings: List[Tuple[float, str, int, str]],\n markdown_timings: List[Tuple[float, str]],\n pygments_timings: List[Tuple[float, str]],\n) -> None:\n \"\"\"Report timing statistics for message rendering.\n\n Args:\n message_timings: List of (duration, message_type, index, uuid) tuples\n markdown_timings: List of (duration, uuid) tuples for markdown rendering\n pygments_timings: List of (duration, uuid) tuples for Pygments highlighting\n \"\"\"\n if not message_timings:\n return\n\n # Sort by duration descending\n sorted_timings = sorted(message_timings, key=lambda x: x[0], reverse=True)\n\n # Calculate statistics\n total_msg_time = sum(t[0] for t in message_timings)\n avg_time = total_msg_time / len(message_timings)\n\n # Report slowest messages\n print(\"\\n[TIMING] Loop statistics:\", flush=True)\n print(f\"[TIMING] Total messages: {len(message_timings)}\", flush=True)\n print(f\"[TIMING] Average time per message: {avg_time * 1000:.1f}ms\", flush=True)\n print(\"[TIMING] Slowest 10 messages:\", flush=True)\n for duration, msg_type, idx, uuid in sorted_timings[:10]:\n print(\n f\"[TIMING] Message {uuid} (#{idx}, {msg_type}): {duration * 1000:.1f}ms\",\n flush=True,\n )\n\n # Report markdown rendering statistics\n if markdown_timings:\n sorted_markdown = sorted(markdown_timings, key=lambda x: x[0], reverse=True)\n total_markdown_time = sum(t[0] for t in markdown_timings)\n print(f\"\\n[TIMING] Markdown rendering:\", flush=True)\n print(f\"[TIMING] Total operations: {len(markdown_timings)}\", flush=True)\n print(f\"[TIMING] Total time: {total_markdown_time:.3f}s\", flush=True)\n print(f\"[TIMING] Slowest 10 operations:\", flush=True)\n for duration, uuid in sorted_markdown[:10]:\n print(\n f\"[TIMING] {uuid}: {duration * 1000:.1f}ms\",\n flush=True,\n )\n\n # Report Pygments highlighting statistics\n if pygments_timings:\n sorted_pygments = sorted(pygments_timings, key=lambda x: x[0], reverse=True)\n total_pygments_time = sum(t[0] for t in pygments_timings)\n print(f\"\\n[TIMING] Pygments highlighting:\", flush=True)\n print(f\"[TIMING] Total operations: {len(pygments_timings)}\", flush=True)\n print(f\"[TIMING] Total time: {total_pygments_time:.3f}s\", flush=True)\n print(f\"[TIMING] Slowest 10 operations:\", flush=True)\n for duration, uuid in sorted_pygments[:10]:\n print(\n f\"[TIMING] {uuid}: {duration * 1000:.1f}ms\",\n flush=True,\n )\n","structuredPatch":[{"oldStart":20,"oldLines":23,"newStart":20,"newLines":13,"lines":[" \"\"\"Set a timing variable in the global timing data dict."," "," Args:","- name: Variable name (e.g., \"_markdown_timings\", \"_pygments_timings\")","+ name: Variable name (e.g., \"_markdown_timings\", \"_pygments_timings\", \"_current_msg_uuid\")"," value: Value to set"," \"\"\""," if DEBUG_TIMING:"," _timing_data[name] = value"," "," ","-def set_current_msg_uuid(uuid: str) -> None:","- \"\"\"Set the current message UUID for timing tracking.","-","- Args:","- uuid: Message UUID","- \"\"\"","- if DEBUG_TIMING:","- _timing_data[\"_current_msg_uuid\"] = uuid","-","-"," @contextmanager"," def timing_stat(list_name: str) -> Iterator[None]:"," \"\"\"Context manager for tracking timing statistics."]}],"userModified":false,"replaceAll":false}} \ No newline at end of file diff --git a/test/test_preview_truncation.py b/test/test_preview_truncation.py new file mode 100644 index 00000000..0ccb45c4 --- /dev/null +++ b/test/test_preview_truncation.py @@ -0,0 +1,129 @@ +"""Tests for code preview truncation in tool results. + +Regression test for the bug where Pygments highlighted code previews +weren't truncated because the code assumed multiple rows per line, +but HtmlFormatter(linenos="table") produces a single with two s. +""" + +from pathlib import Path + +from claude_code_log.parser import load_transcript +from claude_code_log.renderer import generate_html, _truncate_highlighted_preview + + +class TestPreviewTruncation: + """Tests for preview truncation in collapsible code blocks.""" + + def test_truncate_highlighted_preview_function(self): + """Test the _truncate_highlighted_preview helper directly.""" + # Simulate Pygments output with 10 lines + html = """
 1
+ 2
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+10
line 1
+line 2
+line 3
+line 4
+line 5
+line 6
+line 7
+line 8
+line 9
+line 10
+
""" + + # Truncate to 5 lines + result = _truncate_highlighted_preview(html, 5) + + # Should have lines 1-5 in linenos + assert ' 1' in result + assert ' 5' in result + # Should NOT have lines 6-10 + assert ' 6' not in result + assert '10' not in result + + # Should have lines 1-5 in code + assert "line 1" in result + assert "line 5" in result + # Should NOT have lines 6-10 + assert "line 6" not in result + assert "line 10" not in result + + def test_edit_tool_result_preview_truncation(self): + """Test that Edit tool results have truncated previews in collapsible blocks. + + Regression test for: Preview extraction was looking for multiple tags, + but Pygments produces a single with two s, so the fallback showed + full content instead of truncated preview. + """ + test_data_path = Path(__file__).parent / "test_data" / "edit_tool.jsonl" + + messages = load_transcript(test_data_path) + html = generate_html(messages, "Edit Tool Test") + + # The Edit tool result has 17 lines (>12), so should be collapsible + assert "collapsible-code" in html, "Should have collapsible code block" + + # Find the preview content section + assert "preview-content" in html, "Should have preview content" + + # The preview should only show first 5 lines, not all 17 + # Line 15 is "_timing_data: Dict[str, Any] = {}" - should be in preview + # Line 31 is 'def timing_stat(list_name: str) -> Iterator[None]:"' - should NOT be in preview + + # Extract preview content (between preview-content div tags) + import re + + preview_match = re.search( + r"
(.*?)
\s*", + html, + re.DOTALL, + ) + assert preview_match, "Should find preview-content div" + preview_html = preview_match.group(1) + + # Preview should have early lines (within first 5) + # Line 15 (line 1 of snippet): "_timing_data" + assert "_timing_data" in preview_html, "Preview should contain line 15 content" + + # Preview should NOT have later lines (beyond first 5) + # Line 26 (line 12 of snippet): "if DEBUG_TIMING:" + # Note: Pygments wraps tokens in tags, so check for identifier + assert "DEBUG_TIMING" not in preview_html, ( + "Preview should NOT contain line 26 content (beyond 5 lines)" + ) + + # Line 30-31 (line 16-17 of snippet): "@contextmanager" + assert "contextmanager" not in preview_html, ( + "Preview should NOT contain line 30 content (beyond 5 lines)" + ) + + def test_full_content_still_available(self): + """Test that full content is still available in the expanded section.""" + test_data_path = Path(__file__).parent / "test_data" / "edit_tool.jsonl" + + messages = load_transcript(test_data_path) + html = generate_html(messages, "Edit Tool Test") + + # The full content section should have all lines + import re + + full_match = re.search( + r"
(.*?)
\s*
", + html, + re.DOTALL, + ) + assert full_match, "Should find code-full div" + full_html = full_match.group(1) + + # Full content should have both early and late lines + # Note: Pygments wraps tokens in tags, so we check for the identifier + assert "_timing_data" in full_html, "Full content should contain line 15" + assert "DEBUG_TIMING" in full_html, "Full content should contain line 26" + assert "contextmanager" in full_html, "Full content should contain line 30"