diff --git a/webhook_server/libs/log_parser.py b/webhook_server/libs/log_parser.py index ff0f494b..81e64427 100644 --- a/webhook_server/libs/log_parser.py +++ b/webhook_server/libs/log_parser.py @@ -313,7 +313,9 @@ def parse_log_file(self, file_path: Path) -> list[LogEntry]: def parse_json_log_entry(self, json_line: str) -> LogEntry | None: """Parse a JSONL log entry into a LogEntry object. - Parses JSONL format (one compact JSON object per line). + Routes JSON entries by their ``type`` field: + - ``"log_entry"`` -> individual structured log line + - ``"webhook_summary"`` (or missing) -> legacy webhook summary Args: json_line: Raw JSON string from webhooks_*.json files (single line) @@ -329,6 +331,80 @@ def parse_json_log_entry(self, json_line: str) -> LogEntry | None: except json.JSONDecodeError: return None + if not isinstance(data, dict): + return None + + entry_type = data.get("type", "webhook_summary") # backward compat + if entry_type == "log_entry": + return self._parse_json_log_line(data) + return self._parse_json_webhook_summary(data) + + def _parse_json_log_line(self, data: dict[str, Any]) -> LogEntry | None: + """Parse an individual JSON log line entry (type="log_entry") into a LogEntry. + + Expected JSON format:: + + { + "type": "log_entry", + "timestamp": "ISO8601", + "level": "INFO", + "logger_name": "GithubWebhook", + "message": "Processing webhook", + "hook_id": "abc123", + "event_type": "pull_request", + "repository": "org/repo", + "pr_number": 123, + "api_user": "bot-user" + } + + Args: + data: Parsed JSON dictionary with type="log_entry" + + Returns: + LogEntry object if parsing successful, None otherwise + """ + # Parse timestamp + timestamp_str = data.get("timestamp", "") + if not timestamp_str: + return None + + try: + timestamp = datetime.datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) + if timestamp.tzinfo is None: + timestamp = timestamp.replace(tzinfo=datetime.UTC) + except (ValueError, TypeError): + return None + + task_id, task_type, task_status, cleaned_message = self._extract_task_fields(data.get("message", "")) + token_spend = self.extract_token_spend(cleaned_message) + + return LogEntry( + timestamp=timestamp, + level=data.get("level", "INFO"), + logger_name=data.get("logger_name", "GithubWebhook"), + message=cleaned_message, + hook_id=data.get("hook_id"), + event_type=data.get("event_type"), + repository=data.get("repository"), + pr_number=data.get("pr_number"), + github_user=data.get("api_user"), + task_id=task_id, + task_type=task_type, + task_status=task_status, + token_spend=token_spend, + ) + + def _parse_json_webhook_summary(self, data: dict[str, Any]) -> LogEntry | None: + """Parse a webhook summary JSON entry into a LogEntry. + + Handles legacy format where ``type`` is ``"webhook_summary"`` or missing. + + Args: + data: Parsed JSON dictionary with type="webhook_summary" (or no type) + + Returns: + LogEntry object if parsing successful, None otherwise + """ # Parse timestamp from timing.started_at try: timing = data.get("timing", {}) @@ -345,21 +421,25 @@ def parse_json_log_entry(self, json_line: str) -> LogEntry | None: pr_data = data.get("pr") or {} pr_number = pr_data.get("number") if pr_data else None - # Create summary message - message = self._create_json_summary_message(data) - - # Derive task_status from success field - success = data.get("success") - if success is True: - task_status = "completed" - elif success is False: - task_status = "failed" + # Read status from new field, fall back to deriving from success (backward compat) + status = data.get("status") + if status: + task_status = status # "success", "failed", "partial" else: - task_status = None + success = data.get("success") + if success is True: + task_status = "success" + elif success is False: + task_status = "failed" + else: + task_status = None + + # Create summary message (after task_status resolution so it can use the status) + message = self._create_json_summary_message(data, task_status) return LogEntry( timestamp=timestamp, - level="INFO", # JSON logs don't have levels, default to INFO + level=data.get("level", "COMPLETED"), logger_name="GithubWebhook", message=message, hook_id=data.get("hook_id"), @@ -373,11 +453,12 @@ def parse_json_log_entry(self, json_line: str) -> LogEntry | None: token_spend=data.get("token_spend"), ) - def _create_json_summary_message(self, data: dict[str, Any]) -> str: + def _create_json_summary_message(self, data: dict[str, Any], task_status: str | None = None) -> str: """Create a summary message from JSON log data. Args: data: Parsed JSON log data + task_status: Resolved task status ("success", "failed", "partial", or None) Returns: Human-readable summary message @@ -399,7 +480,17 @@ def _create_json_summary_message(self, data: dict[str, Any]) -> str: if pr_data and pr_data.get("number"): parts.append(f"PR #{pr_data['number']}") - if data.get("success"): + if task_status == "partial": + parts.append("- completed with partial failures") + elif task_status == "success": + parts.append("- completed successfully") + elif task_status == "failed": + parts.append("- failed") + error = data.get("error") + error_type = error.get("type") if isinstance(error, dict) else None + if error_type: + parts.append(f"({error_type})") + elif data.get("success"): parts.append("- completed successfully") else: parts.append("- failed") @@ -465,10 +556,15 @@ def get_raw_json_entry(self, json_line: str) -> dict[str, Any] | None: return None try: - return json.loads(json_line) + data = json.loads(json_line) except json.JSONDecodeError: return None + if not isinstance(data, dict): + return None + + return data + async def tail_log_file(self, file_path: Path, follow: bool = True) -> AsyncGenerator[LogEntry]: """ Tail a log file and yield new LogEntry objects as they are added. @@ -501,13 +597,41 @@ async def tail_log_file(self, file_path: Path, follow: bool = True) -> AsyncGene # Not following, exit when no more data break - async def monitor_log_directory(self, log_dir: Path, pattern: str = "*.log") -> AsyncGenerator[LogEntry]: + async def tail_json_log_file(self, file_path: Path, follow: bool = True) -> AsyncGenerator[LogEntry]: + """Tail a JSON log file and yield new LogEntry objects as they are added. + + Args: + file_path: Path to the JSONL file to monitor + follow: Whether to continue monitoring for new entries + + Yields: + LogEntry objects for new JSON log lines + """ + if not file_path.exists(): + return + + with open(file_path, encoding="utf-8") as f: + # Move to end of file + f.seek(0, 2) + + while True: + line = f.readline() + if line: + entry = self.parse_json_log_entry(line) + if entry: + yield entry + elif follow: + await asyncio.sleep(0.1) + else: + break + + async def monitor_log_directory(self, log_dir: Path, pattern: str = "webhooks_*.json") -> AsyncGenerator[LogEntry]: """ Monitor a directory for log files and yield new entries from all files. Args: log_dir: Directory path containing log files - pattern: Glob pattern for log files (default: "*.log") + pattern: Glob pattern for log files (default: "webhooks_*.json") Yields: LogEntry objects from all monitored log files @@ -518,6 +642,9 @@ async def monitor_log_directory(self, log_dir: Path, pattern: str = "*.log") -> # Find all existing log files including rotated ones log_files: list[Path] = [] log_files.extend(log_dir.glob(pattern)) + # Backward-compatible fallback for deployments that still stream plain .log files + if not log_files and pattern == "webhooks_*.json": + log_files.extend(log_dir.glob("*.log")) # Only monitor current log file, not rotated ones for real-time current_log_files = [ f for f in log_files if not any(f.name.endswith(ext) for ext in [".1", ".2", ".3", ".4", ".5"]) @@ -531,8 +658,13 @@ async def monitor_log_directory(self, log_dir: Path, pattern: str = "*.log") -> current_log_files.sort(key=lambda f: f.stat().st_mtime, reverse=True) most_recent_file = current_log_files[0] - async for entry in self.tail_log_file(most_recent_file, follow=True): - yield entry + # Use appropriate tailer based on file type + if most_recent_file.suffix == ".json": + async for entry in self.tail_json_log_file(most_recent_file, follow=True): + yield entry + else: + async for entry in self.tail_log_file(most_recent_file, follow=True): + yield entry class LogFilter: diff --git a/webhook_server/tests/test_context.py b/webhook_server/tests/test_context.py index 155af5e7..615db6df 100644 --- a/webhook_server/tests/test_context.py +++ b/webhook_server/tests/test_context.py @@ -325,7 +325,11 @@ def test_to_dict_returns_correct_structure(self, mock_datetime): assert result["initial_rate_limit"] == 5000 assert result["final_rate_limit"] == 4985 - # Status + # Level and status + assert result["level"] == "COMPLETED" + assert result["status"] == "success" + + # Success assert result["success"] is True assert result["error"] is None @@ -391,11 +395,106 @@ def test_to_dict_with_error(self, mock_datetime): result = ctx.to_dict() + assert result["level"] == "COMPLETED" + assert result["status"] == "failed" assert result["success"] is False assert result["error"] is not None assert result["error"]["type"] == "ValueError" assert result["error"]["message"] == "Something went wrong" + def test_derive_level_always_returns_completed(self): + """Test _derive_level always returns COMPLETED regardless of step or context state.""" + ctx = WebhookContext( + hook_id="hook-level-1", + event_type="pull_request", + repository="owner/repo", + repository_full_name="owner/repo", + ) + # Default state (success=True, no steps) + assert ctx._derive_level() == "COMPLETED" + + # With a failed step + ctx.workflow_steps["some_step"] = {"status": "failed"} + assert ctx._derive_level() == "COMPLETED" + + # With success=False + ctx.success = False + assert ctx._derive_level() == "COMPLETED" + + # With can_merge=False step + ctx.workflow_steps["merge_check"] = {"status": "completed", "can_merge": False} + assert ctx._derive_level() == "COMPLETED" + + def test_derive_status_returns_success_when_all_ok(self): + """Test _derive_status returns 'success' when overall success and all steps succeeded.""" + ctx = WebhookContext( + hook_id="hook-status-1", + event_type="pull_request", + repository="owner/repo", + repository_full_name="owner/repo", + ) + ctx.workflow_steps["step1"] = {"status": "completed", "can_merge": True} + ctx.workflow_steps["step2"] = {"status": "completed", "success": True} + assert ctx._derive_status() == "success" + + def test_derive_status_returns_success_with_no_steps(self): + """Test _derive_status returns 'success' when there are no workflow steps.""" + ctx = WebhookContext( + hook_id="hook-status-2", + event_type="push", + repository="owner/repo", + repository_full_name="owner/repo", + ) + assert ctx._derive_status() == "success" + + def test_derive_status_returns_failed_when_success_is_false(self): + """Test _derive_status returns 'failed' when overall success is False.""" + ctx = WebhookContext( + hook_id="hook-status-3", + event_type="pull_request", + repository="owner/repo", + repository_full_name="owner/repo", + ) + ctx.success = False + assert ctx._derive_status() == "failed" + + def test_derive_status_returns_partial_when_step_failed(self): + """Test _derive_status returns 'partial' when webhook succeeded but a step failed.""" + ctx = WebhookContext( + hook_id="hook-status-4", + event_type="pull_request", + repository="owner/repo", + repository_full_name="owner/repo", + ) + ctx.workflow_steps["good_step"] = {"status": "completed", "success": True} + ctx.workflow_steps["bad_step"] = {"status": "failed"} + assert ctx._derive_status() == "partial" + + def test_derive_status_returns_partial_when_completed_step_not_successful(self): + """Test _derive_status returns 'partial' when a completed step has failure indicators.""" + ctx = WebhookContext( + hook_id="hook-status-5", + event_type="pull_request", + repository="owner/repo", + repository_full_name="owner/repo", + ) + ctx.workflow_steps["merge_check"] = {"status": "completed", "can_merge": False} + assert ctx._derive_status() == "partial" + + def test_derive_status_returns_partial_with_error_in_completed_step(self): + """Test _derive_status returns 'partial' when completed step has error field.""" + ctx = WebhookContext( + hook_id="hook-status-6", + event_type="pull_request", + repository="owner/repo", + repository_full_name="owner/repo", + ) + ctx.workflow_steps["step_with_error"] = { + "status": "completed", + "error": "something went wrong", + } + assert ctx._derive_status() == "partial" + class TestContextManagement: """Tests for module-level context management functions.""" diff --git a/webhook_server/tests/test_json_log_handler.py b/webhook_server/tests/test_json_log_handler.py new file mode 100644 index 00000000..af5bde50 --- /dev/null +++ b/webhook_server/tests/test_json_log_handler.py @@ -0,0 +1,320 @@ +"""Tests for webhook_server/utils/json_log_handler.py. + +Tests JsonLogHandler which writes log records as JSONL to date-based webhook log files, +enriched with WebhookContext data when available. +""" + +from __future__ import annotations + +import json +import logging +from datetime import UTC, datetime +from pathlib import Path +from unittest.mock import Mock, patch + +import pytest + +from webhook_server.utils.context import clear_context, create_context +from webhook_server.utils.json_log_handler import JsonLogHandler + + +@pytest.fixture(autouse=True) +def cleanup_context() -> None: + """Clean up context after each test.""" + yield # type: ignore[misc] + clear_context() + + +@pytest.fixture +def log_dir(tmp_path: Path) -> Path: + """Return a temporary log directory path.""" + return tmp_path / "logs" + + +@pytest.fixture +def handler(log_dir: Path) -> JsonLogHandler: + """Create a JsonLogHandler pointing at a temporary directory.""" + return JsonLogHandler(log_dir=str(log_dir)) + + +@pytest.fixture +def logger_with_handler(handler: JsonLogHandler) -> logging.Logger: + """Create a stdlib logger wired to the JsonLogHandler.""" + logger = logging.getLogger("test_json_log_handler") + logger.setLevel(logging.DEBUG) + logger.handlers.clear() + logger.addHandler(handler) + return logger + + +def _read_log_lines(log_dir: Path) -> list[dict]: + """Read all JSONL lines from the current-date log file in *log_dir*.""" + date_str = datetime.now(UTC).strftime("%Y-%m-%d") + log_file = log_dir / f"webhooks_{date_str}.json" + assert log_file.exists(), f"Expected log file {log_file} to exist" + lines = log_file.read_text(encoding="utf-8").strip().splitlines() + return [json.loads(line) for line in lines] + + +class TestJsonLogHandlerInit: + """Tests for handler initialisation.""" + + def test_creates_log_dir_if_missing(self, tmp_path: Path) -> None: + """Handler __init__ creates the log directory when it does not exist.""" + new_dir = tmp_path / "nonexistent" / "logs" + assert not new_dir.exists() + JsonLogHandler(log_dir=str(new_dir)) + assert new_dir.is_dir() + + def test_existing_log_dir_is_fine(self, tmp_path: Path) -> None: + """Handler __init__ succeeds when the log directory already exists.""" + existing = tmp_path / "logs" + existing.mkdir() + handler = JsonLogHandler(log_dir=str(existing)) + assert handler.log_dir == existing + + +class TestEmitBasic: + """Tests for basic emit behaviour and entry format.""" + + def test_emit_writes_valid_jsonl(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """A single emit produces a valid JSONL line in the correct file.""" + logger_with_handler.info("hello world") + entries = _read_log_lines(log_dir) + assert len(entries) == 1 + + def test_entry_format_fields(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """Each entry contains the mandatory fields.""" + logger_with_handler.warning("something happened") + entry = _read_log_lines(log_dir)[0] + + assert entry["type"] == "log_entry" + assert "timestamp" in entry + # Verify timestamp is valid ISO format + datetime.fromisoformat(entry["timestamp"]) + assert entry["level"] == "WARNING" + assert entry["logger_name"] == "test_json_log_handler" + assert entry["message"] == "something happened" + + def test_date_based_file_naming(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """Log file follows webhooks_YYYY-MM-DD.json naming convention.""" + logger_with_handler.info("test") + date_str = datetime.now(UTC).strftime("%Y-%m-%d") + expected = log_dir / f"webhooks_{date_str}.json" + assert expected.exists() + + def test_append_behaviour(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """Multiple emits append to the same file.""" + logger_with_handler.info("first") + logger_with_handler.info("second") + logger_with_handler.info("third") + + entries = _read_log_lines(log_dir) + assert len(entries) == 3 + assert entries[0]["message"] == "first" + assert entries[1]["message"] == "second" + assert entries[2]["message"] == "third" + + +class TestLogLevels: + """Tests for different log levels.""" + + @pytest.mark.parametrize( + ("method", "expected_level"), + [ + ("debug", "DEBUG"), + ("info", "INFO"), + ("warning", "WARNING"), + ("error", "ERROR"), + ], + ) + def test_level_is_recorded( + self, + logger_with_handler: logging.Logger, + log_dir: Path, + method: str, + expected_level: str, + ) -> None: + """Log level is correctly captured in the entry.""" + getattr(logger_with_handler, method)("test message") + entry = _read_log_lines(log_dir)[0] + assert entry["level"] == expected_level + + +class TestAnsiStripping: + """Tests for ANSI escape code removal.""" + + def test_ansi_codes_stripped_from_message(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """ANSI colour codes are removed from the message before writing.""" + logger_with_handler.info("\x1b[31mRed text\x1b[0m and \x1b[1;32mbold green\x1b[0m") + entry = _read_log_lines(log_dir)[0] + assert "\x1b" not in entry["message"] + assert "Red text" in entry["message"] + assert "bold green" in entry["message"] + + def test_message_without_ansi_unchanged(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """Plain messages pass through without modification.""" + logger_with_handler.info("plain message") + entry = _read_log_lines(log_dir)[0] + assert entry["message"] == "plain message" + + +class TestContextEnrichment: + """Tests for WebhookContext enrichment.""" + + def test_context_fields_added_when_context_set(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """When WebhookContext is active, its fields appear in the entry.""" + ctx = create_context( + hook_id="delivery-abc", + event_type="pull_request", + repository="org/repo", + repository_full_name="org/repo", + action="opened", + sender="user1", + api_user="bot-user", + ) + ctx.pr_number = 42 + + logger_with_handler.info("processing webhook") + entry = _read_log_lines(log_dir)[0] + + assert entry["hook_id"] == "delivery-abc" + assert entry["event_type"] == "pull_request" + assert entry["repository"] == "org/repo" + assert entry["pr_number"] == 42 + assert entry["api_user"] == "bot-user" + + def test_no_context_fields_when_context_absent(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """Without WebhookContext, context-specific fields are absent.""" + logger_with_handler.info("no context here") + entry = _read_log_lines(log_dir)[0] + + assert "hook_id" not in entry + assert "event_type" not in entry + assert "repository" not in entry + assert "pr_number" not in entry + assert "api_user" not in entry + + def test_basic_fields_present_without_context(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """Basic fields (type, timestamp, level, logger_name, message) always present.""" + logger_with_handler.info("basic info") + entry = _read_log_lines(log_dir)[0] + + assert entry["type"] == "log_entry" + assert "timestamp" in entry + assert entry["level"] == "INFO" + assert entry["logger_name"] == "test_json_log_handler" + assert entry["message"] == "basic info" + + +class TestExceptionTraceback: + """Tests for exception traceback capture.""" + + def test_emit_with_exception_includes_traceback(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """Test that exception info is included in JSON entry.""" + try: + raise ValueError("test error") + except ValueError: + logger_with_handler.exception("Something failed") + + entries = _read_log_lines(log_dir) + assert len(entries) == 1 + entry = entries[0] + + assert entry["level"] == "ERROR" + assert "Something failed" in entry["message"] + assert "exc_info" in entry + assert "ValueError" in entry["exc_info"] + assert "test error" in entry["exc_info"] + assert "Traceback" in entry["exc_info"] + + def test_emit_without_exception_has_no_exc_info(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """Normal log entries do not contain exc_info field.""" + logger_with_handler.info("no error here") + entry = _read_log_lines(log_dir)[0] + assert "exc_info" not in entry + + +class TestErrorHandling: + """Tests for error resilience.""" + + def test_emit_does_not_crash_on_write_failure(self, handler: JsonLogHandler) -> None: + """Handler silently handles write failures via handleError.""" + record = logging.LogRecord( + name="test", + level=logging.INFO, + pathname="", + lineno=0, + msg="test message", + args=(), + exc_info=None, + ) + + with patch.object(handler, "_append_to_file", side_effect=OSError("Disk full")): + # Must not raise + handler.emit(record) + + def test_emit_does_not_crash_on_open_failure( + self, + handler: JsonLogHandler, + ) -> None: + """Handler silently handles open() failures.""" + record = logging.LogRecord( + name="test", + level=logging.INFO, + pathname="", + lineno=0, + msg="test message", + args=(), + exc_info=None, + ) + + with patch("builtins.open", side_effect=OSError("Permission denied")): + handler.emit(record) + + +class TestFileLocking: + """Tests for fcntl file locking behaviour.""" + + @patch("webhook_server.utils.json_log_handler.HAS_FCNTL", new=True) + @patch("fcntl.flock") + def test_flock_called_when_available( + self, mock_flock: Mock, logger_with_handler: logging.Logger, log_dir: Path + ) -> None: + """When HAS_FCNTL is True, fcntl.flock is called for lock/unlock.""" + logger_with_handler.info("locked write") + assert mock_flock.call_count >= 2 # At least LOCK_EX + LOCK_UN + + @patch("webhook_server.utils.json_log_handler.HAS_FCNTL", new=False) + def test_works_without_fcntl(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """When HAS_FCNTL is False, writing still works without locking.""" + logger_with_handler.info("no lock write") + entries = _read_log_lines(log_dir) + assert len(entries) == 1 + assert entries[0]["message"] == "no lock write" + + +class TestUnicodeContent: + """Tests for Unicode content handling.""" + + def test_unicode_message_written_correctly(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """Messages with Unicode characters are preserved.""" + logger_with_handler.info("测试 emoji 🚀 and accents: éàü") + entry = _read_log_lines(log_dir)[0] + assert "测试" in entry["message"] + assert "🚀" in entry["message"] + assert "éàü" in entry["message"] + + def test_unicode_in_context_fields(self, logger_with_handler: logging.Logger, log_dir: Path) -> None: + """Unicode in WebhookContext fields is preserved.""" + create_context( + hook_id="hook-unicode", + event_type="pull_request", + repository="org/日本語リポ", + repository_full_name="org/日本語リポ", + api_user="用户", + ) + logger_with_handler.info("ctx test") + entry = _read_log_lines(log_dir)[0] + assert entry["repository"] == "org/日本語リポ" + assert entry["api_user"] == "用户" diff --git a/webhook_server/tests/test_log_parser.py b/webhook_server/tests/test_log_parser.py index e5bf7868..9a99fc49 100644 --- a/webhook_server/tests/test_log_parser.py +++ b/webhook_server/tests/test_log_parser.py @@ -3,6 +3,7 @@ import asyncio import contextlib import datetime +import json import logging import tempfile import unittest.mock @@ -781,6 +782,7 @@ def test_parse_json_log_entry_valid_json(self) -> None: "action": "opened", "repository": "org/test-repo", "api_user": "test-user", + "level": "COMPLETED", "success": true, "token_spend": 35, "timing": { @@ -796,14 +798,14 @@ def test_parse_json_log_entry_valid_json(self) -> None: assert entry is not None assert entry.timestamp == datetime.datetime(2025, 7, 31, 10, 30, 0, 123000, tzinfo=datetime.UTC) - assert entry.level == "INFO" + assert entry.level == "COMPLETED" assert entry.logger_name == "GithubWebhook" assert entry.hook_id == "abc123-def456" assert entry.event_type == "pull_request" assert entry.repository == "org/test-repo" assert entry.pr_number == 123 assert entry.github_user == "test-user" - assert entry.task_status == "completed" + assert entry.task_status == "success" assert entry.token_spend == 35 assert "pull_request/opened" in entry.message assert "org/test-repo" in entry.message @@ -819,6 +821,7 @@ def test_parse_json_log_entry_failed_webhook(self) -> None: "action": "created", "repository": "org/repo", "api_user": "user1", + "level": "COMPLETED", "success": false, "token_spend": 10, "timing": { @@ -835,6 +838,7 @@ def test_parse_json_log_entry_failed_webhook(self) -> None: entry = parser.parse_json_log_entry(json_line) assert entry is not None + assert entry.level == "COMPLETED" assert entry.hook_id == "failed-hook" assert entry.task_status == "failed" assert entry.pr_number == 456 @@ -849,6 +853,7 @@ def test_parse_json_log_entry_without_pr(self) -> None: "event_type": "push", "repository": "org/repo", "api_user": "user2", + "level": "COMPLETED", "success": true, "timing": { "started_at": "2025-07-31T12:00:00Z" @@ -879,6 +884,22 @@ def test_parse_json_log_entry_invalid_json(self) -> None: entry = parser.parse_json_log_entry(line) assert entry is None + def test_parse_json_log_entry_non_dict_json(self) -> None: + """Test parsing valid JSON that is not a dict returns None.""" + parser = LogParser() + non_dict_lines = [ + "[]", + '"hello"', + "42", + "[1, 2, 3]", + "true", + "null", + ] + + for line in non_dict_lines: + entry = parser.parse_json_log_entry(line) + assert entry is None, f"Expected None for non-dict JSON: {line}" + def test_parse_json_log_entry_missing_timestamp(self) -> None: """Test parsing JSON without timing.started_at returns None.""" parser = LogParser() @@ -918,6 +939,7 @@ def test_parse_json_log_entry_timezone_handling(self) -> None: "hook_id": "z-time", "event_type": "push", "repository": "org/repo", + "level": "COMPLETED", "success": true, "timing": {"started_at": "2025-07-31T10:00:00Z"} }""" @@ -930,6 +952,7 @@ def test_parse_json_log_entry_timezone_handling(self) -> None: "hook_id": "plus-time", "event_type": "push", "repository": "org/repo", + "level": "COMPLETED", "success": true, "timing": {"started_at": "2025-07-31T10:00:00+00:00"} }""" @@ -946,6 +969,7 @@ def test_parse_json_log_entry_extracts_all_fields(self) -> None: "action": "synchronize", "repository": "owner/repo-name", "api_user": "github-user", + "level": "COMPLETED", "success": true, "token_spend": 42, "timing": { @@ -963,7 +987,7 @@ def test_parse_json_log_entry_extracts_all_fields(self) -> None: assert entry is not None assert entry.timestamp == datetime.datetime(2025, 8, 1, 14, 30, 45, 678000, tzinfo=datetime.UTC) - assert entry.level == "INFO" + assert entry.level == "COMPLETED" assert entry.logger_name == "GithubWebhook" assert entry.message is not None assert entry.hook_id == "complete-hook" @@ -973,7 +997,7 @@ def test_parse_json_log_entry_extracts_all_fields(self) -> None: assert entry.github_user == "github-user" assert entry.task_id is None assert entry.task_type is None - assert entry.task_status == "completed" + assert entry.task_status == "success" assert entry.token_spend == 42 def test_parse_json_log_file_multiple_entries(self, tmp_path: Path) -> None: @@ -982,12 +1006,14 @@ def test_parse_json_log_file_multiple_entries(self, tmp_path: Path) -> None: # Each JSON object must be on a single line (JSON lines format) json_content = ( '{"hook_id": "hook1", "event_type": "push", "repository": "org/repo1", ' - '"api_user": "user1", "success": true, "timing": {"started_at": "2025-07-31T10:00:00Z"}}\n' + '"api_user": "user1", "level": "COMPLETED", "success": true, ' + '"timing": {"started_at": "2025-07-31T10:00:00Z"}}\n' '{"hook_id": "hook2", "event_type": "pull_request", "action": "opened", ' - '"repository": "org/repo2", "api_user": "user2", "success": true, ' + '"repository": "org/repo2", "api_user": "user2", "level": "COMPLETED", "success": true, ' '"timing": {"started_at": "2025-07-31T10:01:00Z"}, "pr": {"number": 123}}\n' '{"hook_id": "hook3", "event_type": "issue_comment", "repository": "org/repo3", ' - '"api_user": "user3", "success": false, "timing": {"started_at": "2025-07-31T10:02:00Z"}}\n' + '"api_user": "user3", "level": "COMPLETED", "success": false, ' + '"timing": {"started_at": "2025-07-31T10:02:00Z"}}\n' ) log_file = tmp_path / "webhooks_test.json" log_file.write_text(json_content) @@ -1000,7 +1026,7 @@ def test_parse_json_log_file_multiple_entries(self, tmp_path: Path) -> None: assert entries[0].repository == "org/repo1" assert entries[1].hook_id == "hook2" assert entries[1].pr_number == 123 - assert entries[1].task_status == "completed" + assert entries[1].task_status == "success" assert entries[2].hook_id == "hook3" assert entries[2].task_status == "failed" @@ -1020,11 +1046,11 @@ def test_parse_json_log_file_skips_invalid_lines(self, tmp_path: Path) -> None: # Each JSON object must be on a single line (JSON lines format) json_content = ( '{"hook_id": "valid1", "event_type": "push", "repository": "org/repo", ' - '"success": true, "timing": {"started_at": "2025-07-31T10:00:00Z"}}\n' + '"level": "COMPLETED", "success": true, "timing": {"started_at": "2025-07-31T10:00:00Z"}}\n' "this is not valid json\n" "{incomplete json\n" '{"hook_id": "valid2", "event_type": "pull_request", "repository": "org/repo", ' - '"success": true, "timing": {"started_at": "2025-07-31T10:01:00Z"}}\n' + '"level": "COMPLETED", "success": true, "timing": {"started_at": "2025-07-31T10:01:00Z"}}\n' '{"missing_timestamp": true}\n' ) log_file = tmp_path / "mixed.json" @@ -1095,6 +1121,22 @@ def test_get_raw_json_entry_invalid_json(self) -> None: result = parser.get_raw_json_entry(line) assert result is None + def test_get_raw_json_entry_non_dict_json(self) -> None: + """Test get_raw_json_entry with valid JSON that is not a dict returns None.""" + parser = LogParser() + non_dict_lines = [ + "[]", + '"hello"', + "42", + "[1, 2, 3]", + "true", + "null", + ] + + for line in non_dict_lines: + result = parser.get_raw_json_entry(line) + assert result is None, f"Expected None for non-dict JSON: {line}" + def test_get_raw_json_entry_preserves_structure(self) -> None: """Test that get_raw_json_entry preserves complete JSON structure.""" parser = LogParser() @@ -1133,6 +1175,7 @@ def test_json_summary_message_format_with_action(self) -> None: "event_type": "pull_request", "action": "synchronize", "repository": "org/repo", + "level": "COMPLETED", "success": true, "timing": {"started_at": "2025-07-31T10:00:00Z"} }""" @@ -1149,6 +1192,7 @@ def test_json_summary_message_format_without_action(self) -> None: "hook_id": "hook1", "event_type": "push", "repository": "org/repo", + "level": "COMPLETED", "success": true, "timing": {"started_at": "2025-07-31T10:00:00Z"} }""" @@ -1166,6 +1210,7 @@ def test_parse_json_log_entry_null_pr_field(self) -> None: "hook_id": "null-pr", "event_type": "push", "repository": "org/repo", + "level": "COMPLETED", "success": true, "timing": {"started_at": "2025-07-31T10:00:00Z"}, "pr": null @@ -1190,6 +1235,111 @@ def test_parse_json_log_entry_empty_timing_object(self) -> None: entry = parser.parse_json_log_entry(json_line) assert entry is None + def test_parse_json_log_entry_type_log_entry(self) -> None: + """Test parsing a JSON entry with type='log_entry' routes to _parse_json_log_line.""" + parser = LogParser() + json_line = json.dumps({ + "type": "log_entry", + "timestamp": "2026-03-15T12:00:00+00:00", + "level": "WARNING", + "logger_name": "PullRequestHandler", + "message": "PR labels updated", + "hook_id": "delivery-999", + "event_type": "pull_request", + "repository": "org/my-repo", + "pr_number": 77, + "api_user": "bot-user", + }) + + entry = parser.parse_json_log_entry(json_line) + + assert entry is not None + assert entry.level == "WARNING" + assert entry.logger_name == "PullRequestHandler" + assert entry.message == "PR labels updated" + assert entry.hook_id == "delivery-999" + assert entry.event_type == "pull_request" + assert entry.repository == "org/my-repo" + assert entry.pr_number == 77 + assert entry.github_user == "bot-user" + # Fields not present in log_entry type + assert entry.task_id is None + assert entry.task_type is None + assert entry.task_status is None + assert entry.token_spend is None + + def test_parse_json_log_entry_type_routing_default(self) -> None: + """Test that entries without a 'type' field default to webhook_summary parsing.""" + parser = LogParser() + json_line = json.dumps({ + "hook_id": "no-type-field", + "event_type": "push", + "action": "completed", + "repository": "org/repo", + "success": True, + "timing": { + "started_at": "2026-03-15T08:00:00+00:00", + }, + "pr": None, + }) + + entry = parser.parse_json_log_entry(json_line) + + assert entry is not None + # Should have been routed through _parse_json_webhook_summary + assert entry.hook_id == "no-type-field" + assert entry.event_type == "push" + assert entry.repository == "org/repo" + assert entry.task_status == "success" # derived from success=True + + def test_parse_json_log_entry_mixed_types(self, tmp_path: Path) -> None: + """Test parsing a file containing both log_entry and webhook_summary entries.""" + parser = LogParser() + + log_entry_line = json.dumps({ + "type": "log_entry", + "timestamp": "2026-03-15T09:00:00+00:00", + "level": "INFO", + "logger_name": "GithubWebhook", + "message": "Started processing", + "hook_id": "mixed-hook-1", + "event_type": "pull_request", + "repository": "org/repo", + "pr_number": 10, + "api_user": "bot", + }) + + summary_line = json.dumps({ + "type": "webhook_summary", + "hook_id": "mixed-hook-1", + "event_type": "pull_request", + "action": "opened", + "repository": "org/repo", + "success": True, + "timing": { + "started_at": "2026-03-15T09:00:00+00:00", + "completed_at": "2026-03-15T09:00:05+00:00", + "duration_ms": 5000, + }, + "pr": {"number": 10, "title": "Test PR", "author": "dev"}, + "level": "COMPLETED", + }) + + log_file = tmp_path / "webhooks_2026-03-15.json" + log_file.write_text(f"{log_entry_line}\n{summary_line}\n") + + entries = parser.parse_json_log_file(log_file) + + assert len(entries) == 2 + # First entry is a log_entry + assert entries[0].logger_name == "GithubWebhook" + assert entries[0].message == "Started processing" + assert entries[0].hook_id == "mixed-hook-1" + # Second entry is a webhook_summary + assert entries[1].hook_id == "mixed-hook-1" + assert entries[1].pr_number == 10 + assert entries[1].task_status == "success" # derived from success=True + class TestAdditionalCoverageTests: """Additional tests for edge cases to reach 90%+ coverage.""" @@ -1377,3 +1527,393 @@ def _append_log() -> None: # Should have collected exactly 1 entry from the new content (not from rotated files) assert len(entries) == 1 assert entries[0].message == "New entry after monitoring started" + + def test_parse_json_log_line_empty_timestamp(self) -> None: + """Test _parse_json_log_line returns None when timestamp is empty.""" + parser = LogParser() + json_line = json.dumps({ + "type": "log_entry", + "timestamp": "", + "level": "INFO", + "logger_name": "GithubWebhook", + "message": "test", + }) + + entry = parser.parse_json_log_entry(json_line) + assert entry is None + + def test_parse_json_log_line_naive_timestamp(self) -> None: + """Test _parse_json_log_line adds UTC to naive timestamp.""" + parser = LogParser() + json_line = json.dumps({ + "type": "log_entry", + "timestamp": "2025-07-31T10:00:00", + "level": "INFO", + "logger_name": "GithubWebhook", + "message": "naive tz test", + }) + + entry = parser.parse_json_log_entry(json_line) + + assert entry is not None + assert entry.timestamp.tzinfo == datetime.UTC + assert entry.message == "naive tz test" + + def test_parse_json_log_line_invalid_timestamp(self) -> None: + """Test _parse_json_log_line returns None for invalid timestamp.""" + parser = LogParser() + json_line = json.dumps({ + "type": "log_entry", + "timestamp": "not-a-date", + "level": "INFO", + "logger_name": "GithubWebhook", + "message": "bad ts", + }) + + entry = parser.parse_json_log_entry(json_line) + assert entry is None + + def test_parse_json_log_line_extracts_task_fields(self) -> None: + """Test _parse_json_log_line extracts task_id/task_type/task_status from message.""" + parser = LogParser() + json_line = json.dumps({ + "type": "log_entry", + "timestamp": "2026-03-15T12:00:00+00:00", + "level": "INFO", + "logger_name": "GithubWebhook", + "message": "[task_id=check_tox] [task_type=ci_check] [task_status=started] Running tox checks", + "hook_id": "delivery-task", + "event_type": "pull_request", + "repository": "org/repo", + "pr_number": 42, + "api_user": "bot", + }) + + entry = parser.parse_json_log_entry(json_line) + + assert entry is not None + assert entry.task_id == "check_tox" + assert entry.task_type == "ci_check" + assert entry.task_status == "started" + assert entry.message == "Running tox checks" + + def test_parse_json_log_line_extracts_token_spend(self) -> None: + """Test _parse_json_log_line extracts token_spend from message.""" + parser = LogParser() + json_line = json.dumps({ + "type": "log_entry", + "timestamp": "2026-03-15T12:00:00+00:00", + "level": "INFO", + "logger_name": "GithubWebhook", + "message": "Token spend: 35 API calls (initial: 2831, final: 2796)", + "hook_id": "delivery-tokens", + "event_type": "push", + "repository": "org/repo", + }) + + entry = parser.parse_json_log_entry(json_line) + + assert entry is not None + assert entry.token_spend == 35 + + def test_parse_json_log_line_no_task_fields(self) -> None: + """Test _parse_json_log_line returns None for task fields when message has no tags.""" + parser = LogParser() + json_line = json.dumps({ + "type": "log_entry", + "timestamp": "2026-03-15T12:00:00+00:00", + "level": "INFO", + "logger_name": "GithubWebhook", + "message": "Just a plain message", + "hook_id": "delivery-plain", + "event_type": "push", + "repository": "org/repo", + }) + + entry = parser.parse_json_log_entry(json_line) + + assert entry is not None + assert entry.task_id is None + assert entry.task_type is None + assert entry.task_status is None + assert entry.token_spend is None + assert entry.message == "Just a plain message" + + def test_parse_json_webhook_summary_naive_timestamp(self) -> None: + """Test _parse_json_webhook_summary adds UTC to naive timestamp.""" + parser = LogParser() + json_line = json.dumps({ + "hook_id": "naive-tz", + "event_type": "push", + "repository": "org/repo", + "success": True, + "timing": {"started_at": "2025-07-31T10:00:00"}, + }) + + entry = parser.parse_json_log_entry(json_line) + + assert entry is not None + assert entry.timestamp.tzinfo == datetime.UTC + + def test_parse_json_webhook_summary_success_none(self) -> None: + """Test _parse_json_webhook_summary sets task_status to None when success is None.""" + parser = LogParser() + json_line = json.dumps({ + "hook_id": "null-success", + "event_type": "push", + "repository": "org/repo", + "timing": {"started_at": "2025-07-31T10:00:00Z"}, + }) + + entry = parser.parse_json_log_entry(json_line) + + assert entry is not None + assert entry.task_status is None + + def test_parse_json_webhook_summary_reads_status_field(self) -> None: + """Test _parse_json_webhook_summary reads 'status' field directly when present.""" + parser = LogParser() + + # Test "success" status + json_line = json.dumps({ + "hook_id": "status-success", + "event_type": "push", + "repository": "org/repo", + "status": "success", + "success": True, + "timing": {"started_at": "2025-07-31T10:00:00Z"}, + }) + entry = parser.parse_json_log_entry(json_line) + assert entry is not None + assert entry.task_status == "success" + + # Test "partial" status + json_line = json.dumps({ + "hook_id": "status-partial", + "event_type": "pull_request", + "repository": "org/repo", + "status": "partial", + "success": True, + "timing": {"started_at": "2025-07-31T10:01:00Z"}, + }) + entry = parser.parse_json_log_entry(json_line) + assert entry is not None + assert entry.task_status == "partial" + + # Test "failed" status from field (overrides success boolean) + json_line = json.dumps({ + "hook_id": "status-failed", + "event_type": "push", + "repository": "org/repo", + "status": "failed", + "success": False, + "timing": {"started_at": "2025-07-31T10:02:00Z"}, + }) + entry = parser.parse_json_log_entry(json_line) + assert entry is not None + assert entry.task_status == "failed" + + def test_parse_json_webhook_summary_falls_back_to_success_boolean(self) -> None: + """Test _parse_json_webhook_summary falls back to success boolean when status not present.""" + parser = LogParser() + + # No status field, success=True -> "success" + json_line = json.dumps({ + "hook_id": "no-status-true", + "event_type": "push", + "repository": "org/repo", + "success": True, + "timing": {"started_at": "2025-07-31T10:00:00Z"}, + }) + entry = parser.parse_json_log_entry(json_line) + assert entry is not None + assert entry.task_status == "success" + + # No status field, success=False -> "failed" + json_line = json.dumps({ + "hook_id": "no-status-false", + "event_type": "push", + "repository": "org/repo", + "success": False, + "timing": {"started_at": "2025-07-31T10:01:00Z"}, + }) + entry = parser.parse_json_log_entry(json_line) + assert entry is not None + assert entry.task_status == "failed" + + def test_json_summary_message_partial_status(self) -> None: + """Test that partial status produces 'completed with partial failures' message.""" + parser = LogParser() + json_line = json.dumps({ + "hook_id": "partial-hook", + "event_type": "pull_request", + "action": "opened", + "repository": "org/repo", + "status": "partial", + "success": True, + "timing": {"started_at": "2025-07-31T10:00:00Z"}, + }) + + entry = parser.parse_json_log_entry(json_line) + + assert entry is not None + assert entry.task_status == "partial" + assert "completed with partial failures" in entry.message + + def test_json_summary_message_success_status(self) -> None: + """Test that success status produces 'completed successfully' message.""" + parser = LogParser() + json_line = json.dumps({ + "hook_id": "success-hook", + "event_type": "push", + "repository": "org/repo", + "status": "success", + "success": True, + "timing": {"started_at": "2025-07-31T10:00:00Z"}, + }) + + entry = parser.parse_json_log_entry(json_line) + + assert entry is not None + assert "completed successfully" in entry.message + + def test_json_summary_message_failed_status(self) -> None: + """Test that failed status produces 'failed' message.""" + parser = LogParser() + json_line = json.dumps({ + "hook_id": "failed-hook", + "event_type": "push", + "repository": "org/repo", + "status": "failed", + "success": False, + "error": {"type": "RuntimeError", "message": "boom"}, + "timing": {"started_at": "2025-07-31T10:00:00Z"}, + }) + + entry = parser.parse_json_log_entry(json_line) + + assert entry is not None + assert "failed" in entry.message + assert "RuntimeError" in entry.message + + def test_parse_json_log_file_pretty_printed_format(self, tmp_path: Path) -> None: + """Test parsing JSON log file with pretty-printed entries separated by blank lines.""" + parser = LogParser() + entry1 = json.dumps( + { + "hook_id": "pretty1", + "event_type": "push", + "repository": "org/repo", + "level": "COMPLETED", + "success": True, + "timing": {"started_at": "2025-07-31T10:00:00Z"}, + }, + indent=2, + ) + entry2 = json.dumps( + { + "hook_id": "pretty2", + "event_type": "pull_request", + "action": "opened", + "repository": "org/repo", + "level": "COMPLETED", + "success": False, + "timing": {"started_at": "2025-07-31T10:01:00Z"}, + }, + indent=2, + ) + # Separate entries with blank lines (pretty-printed format) + log_file = tmp_path / "webhooks_pretty.json" + log_file.write_text(f"{entry1}\n\n{entry2}") + + entries = parser.parse_json_log_file(log_file) + + assert len(entries) == 2 + assert entries[0].hook_id == "pretty1" + assert entries[1].hook_id == "pretty2" + + @pytest.mark.asyncio + async def test_tail_log_file_nonexistent(self, tmp_path: Path) -> None: + """Test tail_log_file returns immediately for non-existent file.""" + parser = LogParser() + nonexistent = tmp_path / "does_not_exist.log" + + entries = [entry async for entry in parser.tail_log_file(nonexistent, follow=False)] + + assert entries == [] + + @pytest.mark.asyncio + async def test_tail_json_log_file_nonexistent(self, tmp_path: Path) -> None: + """Test tail_json_log_file returns immediately for non-existent file.""" + parser = LogParser() + nonexistent = tmp_path / "does_not_exist.json" + + entries = [entry async for entry in parser.tail_json_log_file(nonexistent, follow=False)] + + assert entries == [] + + @pytest.mark.asyncio + async def test_tail_json_log_file_follow_false_breaks(self, tmp_path: Path) -> None: + """Test tail_json_log_file with follow=False breaks when no new data.""" + parser = LogParser() + log_file = tmp_path / "webhooks_tail.json" + # Create file with some existing content (tail seeks past it) + log_file.write_text('{"hook_id":"old"}\n') + + # With follow=False, after seeking to end, readline returns "" and it breaks + entries = [entry async for entry in parser.tail_json_log_file(log_file, follow=False)] + + assert entries == [] + + @pytest.mark.asyncio + async def test_monitor_log_directory_json_file(self, tmp_path: Path) -> None: + """Test monitor_log_directory selects .json file and uses tail_json_log_file.""" + parser = LogParser() + + # Create a .json log file + json_log = tmp_path / "webhooks_2025-07-31.json" + json_log.write_text("") + + entries: list[LogEntry] = [] + + async def collect_entries(async_gen: AsyncIterator[LogEntry], max_entries: int = 1) -> None: + count = 0 + async for entry in async_gen: + entries.append(entry) + count += 1 + if count >= max_entries: + break + + # Start monitoring + monitor_task = asyncio.create_task( + collect_entries(parser.monitor_log_directory(tmp_path, pattern="webhooks_*.json"), max_entries=1) + ) + + await asyncio.sleep(0.1) + + # Append a valid JSON entry + json_entry = json.dumps({ + "hook_id": "monitor-json", + "event_type": "push", + "repository": "org/repo", + "level": "COMPLETED", + "success": True, + "timing": {"started_at": "2025-07-31T10:00:00Z"}, + }) + + def _append() -> None: + with open(json_log, "a") as f: + f.write(f"{json_entry}\n") + f.flush() + + await asyncio.to_thread(_append) + + try: + await asyncio.wait_for(monitor_task, timeout=2.0) + except TimeoutError: + monitor_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await monitor_task + + assert len(entries) == 1 + assert entries[0].hook_id == "monitor-json" diff --git a/webhook_server/tests/test_structured_logger.py b/webhook_server/tests/test_structured_logger.py index b087caab..6d47647e 100644 --- a/webhook_server/tests/test_structured_logger.py +++ b/webhook_server/tests/test_structured_logger.py @@ -126,6 +126,7 @@ def test_write_log_writes_valid_json( content = f.read().strip() # Pretty-printed JSON is multi-line, parse the entire content as one JSON object log_entry = json.loads(content) + assert log_entry["type"] == "webhook_summary" assert log_entry["hook_id"] == "test-hook-123" assert log_entry["event_type"] == "pull_request" assert log_entry["repository"] == "org/repo" @@ -377,6 +378,7 @@ def test_write_error_log_with_partial_context( with open(log_file) as f: log_entry = json.loads(f.read().strip()) + assert log_entry["type"] == "webhook_summary" assert log_entry["success"] is False assert log_entry["error"] is not None assert log_entry["error"]["message"] == "Early failure" @@ -397,6 +399,7 @@ def test_write_error_log_without_context(self, log_writer: StructuredLogWriter, with open(log_file) as f: log_entry = json.loads(f.read().strip()) + assert log_entry["type"] == "webhook_summary" assert log_entry["hook_id"] == "test-hook-error" assert log_entry["event_type"] == "push" assert log_entry["repository"] == "org/repo" diff --git a/webhook_server/utils/context.py b/webhook_server/utils/context.py index e4d82f3e..395a32e6 100644 --- a/webhook_server/utils/context.py +++ b/webhook_server/utils/context.py @@ -331,6 +331,37 @@ def _build_summary(self) -> str | None: f"[{_format_duration(duration_ms)}{token_info}] steps=[{steps_str}]" ) + def _derive_level(self) -> str: + """Derive log level from execution context. + + Webhook summaries always return COMPLETED since the webhook + finished processing. Success/failure is tracked separately + in the success field and error field. + + Returns: + "COMPLETED" for webhook summary entries + """ + return "COMPLETED" + + def _derive_status(self) -> str: + """Derive webhook processing status from execution context. + + Returns: + "failed" if overall webhook failed, + "partial" if webhook succeeded but some steps failed, + "success" if everything succeeded + """ + if not self.success: + return "failed" + + for step_data in self.workflow_steps.values(): + if step_data.get("status") == "failed": + return "partial" + if step_data.get("status") == "completed" and not self._detect_success(step_data): + return "partial" + + return "success" + def to_dict(self) -> dict[str, Any]: """Convert context to dictionary for JSON serialization. @@ -342,6 +373,8 @@ def to_dict(self) -> dict[str, Any]: """ return { "hook_id": self.hook_id, + "level": self._derive_level(), + "status": self._derive_status(), "event_type": self.event_type, "action": self.action, "sender": self.sender, diff --git a/webhook_server/utils/helpers.py b/webhook_server/utils/helpers.py index 5a0521cf..4c45e0bf 100644 --- a/webhook_server/utils/helpers.py +++ b/webhook_server/utils/helpers.py @@ -4,15 +4,18 @@ import contextlib import datetime import json +import logging import os import random import re import shlex import shutil import subprocess +import threading from collections.abc import AsyncGenerator from concurrent.futures import Future, as_completed from logging import Logger +from pathlib import Path from typing import Any from uuid import uuid4 @@ -27,12 +30,15 @@ from webhook_server.libs.config import Config from webhook_server.libs.exceptions import NoApiTokenError +from webhook_server.utils.json_log_handler import JsonLogHandler from webhook_server.utils.safe_rotating_handler import SafeRotatingFileHandler # Patch simple_logger to use SafeRotatingFileHandler to prevent crashes # when backup log files are missing during rollover simple_logger.logger.RotatingFileHandler = SafeRotatingFileHandler +_JSON_HANDLER_LOCK = threading.Lock() + def get_logger_with_params( repository_name: str = "", @@ -87,7 +93,7 @@ def get_logger_with_params( # The original 'name' parameter is preserved in log records via the logger name. logger_cache_key = os.path.basename(log_file_path_resolved) if log_file_path_resolved else "console" - return get_logger( + logger = get_logger( name=logger_cache_key, filename=log_file_path_resolved, level=log_level, @@ -97,6 +103,24 @@ def get_logger_with_params( console=True, # Enable console output for docker logs with FORCE_COLOR support ) + # Attach JsonLogHandler for writing log records to the webhook JSONL file. + # Only attach when a log file path is configured (skip console-only loggers) + # and only once per logger instance to avoid duplicate handlers. + # Uses _config.data_dir/logs (same directory as StructuredLogWriter) instead + # of deriving from the text log file path, which may differ for absolute paths. + if log_file_path_resolved: + log_dir = os.path.join(_config.data_dir, "logs") + with _JSON_HANDLER_LOCK: + if not any(isinstance(h, JsonLogHandler) and h.log_dir == Path(log_dir) for h in logger.handlers): + logger.addHandler( + JsonLogHandler( + log_dir=log_dir, + level=getattr(logging, log_level.upper(), logging.DEBUG), + ) + ) + + return logger + def get_log_file_path(config: Config, log_file_name: str | None) -> str | None: """ diff --git a/webhook_server/utils/json_log_handler.py b/webhook_server/utils/json_log_handler.py new file mode 100644 index 00000000..5acc8217 --- /dev/null +++ b/webhook_server/utils/json_log_handler.py @@ -0,0 +1,154 @@ +"""JSON log handler that writes log records to the webhook JSONL log file. + +Intercepts every log record and writes it as a JSON entry to the same +date-based JSONL file used by webhook summaries. Enriches entries with +webhook context (hook_id, repository, event_type, etc.) from the +ContextVar when available. + +Architecture: +- Subclasses logging.Handler for standard library integration +- Reads WebhookContext from ContextVar for per-request enrichment +- Atomic append with fcntl file locking (same pattern as StructuredLogWriter) +- Never crashes the application — uses handleError() on failures + +Entry format: + {"type": "log_entry", "timestamp": "ISO8601", "level": "INFO", + "logger_name": "...", "message": "...", "hook_id": "...", ...} +""" + +import json +import logging +import os +import re +import traceback +from datetime import UTC, datetime +from pathlib import Path + +# Platform-specific imports for file locking +try: + import fcntl + + HAS_FCNTL = True +except ImportError: + HAS_FCNTL = False + +from webhook_server.utils.context import get_context + +# Pre-compiled regex for stripping ANSI escape codes +_ANSI_ESCAPE_RE: re.Pattern[str] = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + + +class JsonLogHandler(logging.Handler): + """Logging handler that writes JSON entries to the webhook JSONL log file. + + Each log record is serialized as a compact JSON object and appended to + the date-based log file (webhooks_YYYY-MM-DD.json). The handler enriches + entries with webhook context data when available. + + Attributes: + log_dir: Directory path for log files + fsync_on_write: Whether to call os.fsync() after each write + """ + + def __init__(self, log_dir: str, level: int = logging.NOTSET, fsync_on_write: bool = False) -> None: + """Initialize the JSON log handler. + + Args: + log_dir: Directory path where JSONL log files are written + level: Minimum log level to handle (default: NOTSET, handles all) + fsync_on_write: Whether to call os.fsync() after each write + (default: False). Enable for durability guarantees at the + cost of higher I/O latency. + """ + super().__init__(level) + self.log_dir: Path = Path(log_dir) + self.fsync_on_write = fsync_on_write + self.log_dir.mkdir(parents=True, exist_ok=True) + + def _get_log_file_path(self) -> Path: + """Get log file path for the current UTC date. + + Returns: + Path to the log file (e.g., {log_dir}/webhooks_2026-01-05.json) + """ + date_str = datetime.now(UTC).strftime("%Y-%m-%d") + return self.log_dir / f"webhooks_{date_str}.json" + + def emit(self, record: logging.LogRecord) -> None: + """Write a log record as JSON to the JSONL log file. + + Builds a JSON dict from the LogRecord and enriches it with webhook + context data (hook_id, repository, event_type, etc.) when available. + + Args: + record: The log record to write + """ + try: + entry = self._build_entry(record) + log_line = json.dumps(entry, ensure_ascii=False) + self._append_to_file(log_line) + except Exception: + self.handleError(record) + + def _build_entry(self, record: logging.LogRecord) -> dict[str, object]: + """Build a JSON-serializable dict from a LogRecord. + + Strips ANSI codes from the message and enriches with webhook + context when available. + + Args: + record: The log record to convert + + Returns: + Dict ready for JSON serialization + """ + message = record.getMessage() + message = _ANSI_ESCAPE_RE.sub("", message) + + exc_text: str | None = None + if record.exc_info and record.exc_info[0] is not None: + exc_text = "".join(traceback.format_exception(*record.exc_info)) + + entry: dict[str, object] = { + "type": "log_entry", + "timestamp": datetime.fromtimestamp(record.created, tz=UTC).isoformat(), + "level": record.levelname, + "logger_name": record.name, + "message": message, + } + + if exc_text: + entry["exc_info"] = exc_text + + # Enrich with webhook context when available + ctx = get_context() + if ctx is not None: + entry["hook_id"] = ctx.hook_id + entry["event_type"] = ctx.event_type + entry["repository"] = ctx.repository + entry["pr_number"] = ctx.pr_number + entry["api_user"] = ctx.api_user + + return entry + + def _append_to_file(self, log_line: str) -> None: + """Atomically append a JSON line to the log file. + + Uses fcntl file locking when available for safe concurrent access. + + Args: + log_line: JSON string to append (without trailing newline) + """ + log_file = self._get_log_file_path() + + with open(log_file, "a", encoding="utf-8") as fd: + if HAS_FCNTL: + fcntl.flock(fd.fileno(), fcntl.LOCK_EX) + try: + fd.write(f"{log_line}\n") + fd.flush() + if self.fsync_on_write: + os.fsync(fd.fileno()) + finally: + if HAS_FCNTL: + fcntl.flock(fd.fileno(), fcntl.LOCK_UN) diff --git a/webhook_server/utils/structured_logger.py b/webhook_server/utils/structured_logger.py index 9de73b22..e5204ead 100644 --- a/webhook_server/utils/structured_logger.py +++ b/webhook_server/utils/structured_logger.py @@ -108,6 +108,7 @@ def write_log(self, context: WebhookContext) -> None: # Get context dict and update timing locally (without mutating context) context_dict = context.to_dict() + context_dict["type"] = "webhook_summary" if "timing" in context_dict: context_dict["timing"]["completed_at"] = completed_at.isoformat() if context.started_at: @@ -213,6 +214,7 @@ def write_error_log( else: # No context - create minimal error entry error_entry = { + "type": "webhook_summary", "hook_id": hook_id, "event_type": event_type, "action": None, @@ -230,6 +232,8 @@ def write_error_log( "token_spend": None, "initial_rate_limit": None, "final_rate_limit": None, + "level": "COMPLETED", + "status": "failed", "success": False, "error": { "type": "WebhookProcessingError", diff --git a/webhook_server/web/log_viewer.py b/webhook_server/web/log_viewer.py index 37a566c9..bac10c0b 100644 --- a/webhook_server/web/log_viewer.py +++ b/webhook_server/web/log_viewer.py @@ -696,6 +696,8 @@ async def get_workflow_steps_json(self, hook_id: str) -> dict[str, Any]: """ # Search JSON logs for this hook_id async for entry in self._stream_json_log_entries(max_files=25, max_entries=50000): + if entry.get("type", "webhook_summary") != "webhook_summary": + continue if entry.get("hook_id") == hook_id: # Found the entry - transform to frontend-expected format try: @@ -1089,6 +1091,7 @@ def sort_key(f: Path) -> tuple[int, float]: # Use appropriate parser based on file type if log_file.suffix == ".json": # JSONL files: one compact JSON object per line + # Process both "log_entry" and "webhook_summary" entries async for line in f: entry = self.log_parser.parse_json_log_entry(line) if entry: @@ -1213,22 +1216,22 @@ async def _stream_json_log_entries( try: # Stream JSONL entries incrementally without loading entire file remaining = max_entries - total_yielded - line_buffer: deque[str] = deque(maxlen=remaining) + entry_buffer: deque[dict[str, Any]] = deque(maxlen=remaining) async with aiofiles.open(log_file, encoding="utf-8") as f: # JSONL format: one JSON object per line async for line in f: - line_buffer.append(line.rstrip("\n")) + data = self.log_parser.get_raw_json_entry(line.rstrip("\n")) + if data is not None: + entry_buffer.append(data) - # Process lines in reverse order (newest first) - for line in reversed(line_buffer): + # Yield entries in reverse order (newest first) + for entry in reversed(entry_buffer): if total_yielded >= max_entries: break - data = self.log_parser.get_raw_json_entry(line) - if data: - yield data - total_yielded += 1 + yield entry + total_yielded += 1 except asyncio.CancelledError: self.logger.debug("Operation cancelled") raise # Always re-raise CancelledError diff --git a/webhook_server/web/static/css/log_viewer.css b/webhook_server/web/static/css/log_viewer.css index ad337980..57c66baf 100644 --- a/webhook_server/web/static/css/log_viewer.css +++ b/webhook_server/web/static/css/log_viewer.css @@ -570,13 +570,15 @@ body { overflow-wrap: anywhere; } +.log-entry.COMPLETED .level { background-color: var(--log-success-bg); color: var(--status-connected-text); } .log-entry.INFO .level { background-color: var(--level-info-bg); color: var(--level-info-border); } .log-entry.ERROR .level { background-color: var(--log-error-bg); color: #dc3545; } .log-entry.WARNING .level { background-color: var(--log-warning-bg); color: #856404; } .log-entry.DEBUG .level { background-color: var(--tag-bg); color: var(--text-secondary); } .log-entry.STEP .level { background-color: var(--log-step-bg); color: #0056b3; } -.log-entry.SUCCESS .level { background-color: var(--log-success-bg); color: #155724; } +.log-entry.SUCCESS .level { background-color: var(--log-success-bg); color: var(--status-connected-text); } +.log-entry.COMPLETED { background-color: rgba(25, 135, 84, 0.05); } .log-entry.INFO { background-color: transparent; } .log-entry.ERROR { background-color: rgba(220, 53, 69, 0.05); } .log-entry.WARNING { background-color: rgba(255, 193, 7, 0.05); } diff --git a/webhook_server/web/static/js/log_viewer.js b/webhook_server/web/static/js/log_viewer.js index a04e484d..530590ce 100644 --- a/webhook_server/web/static/js/log_viewer.js +++ b/webhook_server/web/static/js/log_viewer.js @@ -11,13 +11,19 @@ const CONFIG = { function updateConnectionStatus(connected) { const status = document.getElementById("connectionStatus"); const statusText = document.getElementById("statusText"); + const connectBtn = document.getElementById("connectBtn"); + const disconnectBtn = document.getElementById("disconnectBtn"); if (connected) { status.className = "status connected"; statusText.textContent = "Connected - Real-time updates active"; + if (connectBtn) connectBtn.style.display = "none"; + if (disconnectBtn) disconnectBtn.style.display = ""; } else { status.className = "status disconnected"; statusText.textContent = "Disconnected - Real-time updates inactive"; + if (connectBtn) connectBtn.style.display = ""; + if (disconnectBtn) disconnectBtn.style.display = "none"; } } @@ -174,6 +180,7 @@ function createLogEntryElement(entry) { // Whitelist of allowed log levels to prevent class-name injection const allowedLevels = [ + "COMPLETED", "DEBUG", "INFO", "WARNING", diff --git a/webhook_server/web/templates/log_viewer.html b/webhook_server/web/templates/log_viewer.html index 50b0eff5..e48c692f 100644 --- a/webhook_server/web/templates/log_viewer.html +++ b/webhook_server/web/templates/log_viewer.html @@ -68,6 +68,7 @@