Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 151 additions & 19 deletions webhook_server/libs/log_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,9 @@ def parse_log_file(self, file_path: Path) -> list[LogEntry]:
def parse_json_log_entry(self, json_line: str) -> LogEntry | None:
"""Parse a JSONL log entry into a LogEntry object.

Parses JSONL format (one compact JSON object per line).
Routes JSON entries by their ``type`` field:
- ``"log_entry"`` -> individual structured log line
- ``"webhook_summary"`` (or missing) -> legacy webhook summary

Args:
json_line: Raw JSON string from webhooks_*.json files (single line)
Expand All @@ -329,6 +331,80 @@ def parse_json_log_entry(self, json_line: str) -> LogEntry | None:
except json.JSONDecodeError:
return None

if not isinstance(data, dict):
return None

entry_type = data.get("type", "webhook_summary") # backward compat
if entry_type == "log_entry":
return self._parse_json_log_line(data)
return self._parse_json_webhook_summary(data)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def _parse_json_log_line(self, data: dict[str, Any]) -> LogEntry | None:
"""Parse an individual JSON log line entry (type="log_entry") into a LogEntry.

Expected JSON format::

{
"type": "log_entry",
"timestamp": "ISO8601",
"level": "INFO",
"logger_name": "GithubWebhook",
"message": "Processing webhook",
"hook_id": "abc123",
"event_type": "pull_request",
"repository": "org/repo",
"pr_number": 123,
"api_user": "bot-user"
}

Args:
data: Parsed JSON dictionary with type="log_entry"

Returns:
LogEntry object if parsing successful, None otherwise
"""
# Parse timestamp
timestamp_str = data.get("timestamp", "")
if not timestamp_str:
return None

try:
timestamp = datetime.datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=datetime.UTC)
except (ValueError, TypeError):
return None

task_id, task_type, task_status, cleaned_message = self._extract_task_fields(data.get("message", ""))
token_spend = self.extract_token_spend(cleaned_message)

return LogEntry(
timestamp=timestamp,
level=data.get("level", "INFO"),
logger_name=data.get("logger_name", "GithubWebhook"),
message=cleaned_message,
hook_id=data.get("hook_id"),
event_type=data.get("event_type"),
repository=data.get("repository"),
pr_number=data.get("pr_number"),
github_user=data.get("api_user"),
task_id=task_id,
task_type=task_type,
task_status=task_status,
token_spend=token_spend,
)

def _parse_json_webhook_summary(self, data: dict[str, Any]) -> LogEntry | None:
"""Parse a webhook summary JSON entry into a LogEntry.

Handles legacy format where ``type`` is ``"webhook_summary"`` or missing.

Args:
data: Parsed JSON dictionary with type="webhook_summary" (or no type)

Returns:
LogEntry object if parsing successful, None otherwise
"""
# Parse timestamp from timing.started_at
try:
timing = data.get("timing", {})
Expand All @@ -345,21 +421,25 @@ def parse_json_log_entry(self, json_line: str) -> LogEntry | None:
pr_data = data.get("pr") or {}
pr_number = pr_data.get("number") if pr_data else None

# Create summary message
message = self._create_json_summary_message(data)

# Derive task_status from success field
success = data.get("success")
if success is True:
task_status = "completed"
elif success is False:
task_status = "failed"
# Read status from new field, fall back to deriving from success (backward compat)
status = data.get("status")
if status:
task_status = status # "success", "failed", "partial"
else:
task_status = None
success = data.get("success")
if success is True:
task_status = "success"
elif success is False:
task_status = "failed"
else:
task_status = None
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# Create summary message (after task_status resolution so it can use the status)
message = self._create_json_summary_message(data, task_status)

return LogEntry(
timestamp=timestamp,
level="INFO", # JSON logs don't have levels, default to INFO
level=data.get("level", "COMPLETED"),
logger_name="GithubWebhook",
Comment thread
coderabbitai[bot] marked this conversation as resolved.
message=message,
hook_id=data.get("hook_id"),
Expand All @@ -373,11 +453,12 @@ def parse_json_log_entry(self, json_line: str) -> LogEntry | None:
token_spend=data.get("token_spend"),
)

def _create_json_summary_message(self, data: dict[str, Any]) -> str:
def _create_json_summary_message(self, data: dict[str, Any], task_status: str | None = None) -> str:
"""Create a summary message from JSON log data.

Args:
data: Parsed JSON log data
task_status: Resolved task status ("success", "failed", "partial", or None)

Returns:
Human-readable summary message
Expand All @@ -399,7 +480,17 @@ def _create_json_summary_message(self, data: dict[str, Any]) -> str:
if pr_data and pr_data.get("number"):
parts.append(f"PR #{pr_data['number']}")

if data.get("success"):
if task_status == "partial":
parts.append("- completed with partial failures")
elif task_status == "success":
parts.append("- completed successfully")
elif task_status == "failed":
parts.append("- failed")
error = data.get("error")
error_type = error.get("type") if isinstance(error, dict) else None
if error_type:
parts.append(f"({error_type})")
elif data.get("success"):
parts.append("- completed successfully")
else:
parts.append("- failed")
Expand Down Expand Up @@ -465,10 +556,15 @@ def get_raw_json_entry(self, json_line: str) -> dict[str, Any] | None:
return None

try:
return json.loads(json_line)
data = json.loads(json_line)
except json.JSONDecodeError:
return None

if not isinstance(data, dict):
return None

return data

async def tail_log_file(self, file_path: Path, follow: bool = True) -> AsyncGenerator[LogEntry]:
"""
Tail a log file and yield new LogEntry objects as they are added.
Expand Down Expand Up @@ -501,13 +597,41 @@ async def tail_log_file(self, file_path: Path, follow: bool = True) -> AsyncGene
# Not following, exit when no more data
break

async def monitor_log_directory(self, log_dir: Path, pattern: str = "*.log") -> AsyncGenerator[LogEntry]:
async def tail_json_log_file(self, file_path: Path, follow: bool = True) -> AsyncGenerator[LogEntry]:
"""Tail a JSON log file and yield new LogEntry objects as they are added.

Args:
file_path: Path to the JSONL file to monitor
follow: Whether to continue monitoring for new entries

Yields:
LogEntry objects for new JSON log lines
"""
if not file_path.exists():
return

with open(file_path, encoding="utf-8") as f:
# Move to end of file
f.seek(0, 2)

while True:
line = f.readline()
if line:
entry = self.parse_json_log_entry(line)
if entry:
yield entry
elif follow:
await asyncio.sleep(0.1)
else:
break

async def monitor_log_directory(self, log_dir: Path, pattern: str = "webhooks_*.json") -> AsyncGenerator[LogEntry]:
"""
Monitor a directory for log files and yield new entries from all files.

Args:
log_dir: Directory path containing log files
pattern: Glob pattern for log files (default: "*.log")
pattern: Glob pattern for log files (default: "webhooks_*.json")

Yields:
LogEntry objects from all monitored log files
Expand All @@ -518,6 +642,9 @@ async def monitor_log_directory(self, log_dir: Path, pattern: str = "*.log") ->
# Find all existing log files including rotated ones
log_files: list[Path] = []
log_files.extend(log_dir.glob(pattern))
# Backward-compatible fallback for deployments that still stream plain .log files
if not log_files and pattern == "webhooks_*.json":
log_files.extend(log_dir.glob("*.log"))
# Only monitor current log file, not rotated ones for real-time
current_log_files = [
f for f in log_files if not any(f.name.endswith(ext) for ext in [".1", ".2", ".3", ".4", ".5"])
Expand All @@ -531,8 +658,13 @@ async def monitor_log_directory(self, log_dir: Path, pattern: str = "*.log") ->
current_log_files.sort(key=lambda f: f.stat().st_mtime, reverse=True)
most_recent_file = current_log_files[0]

async for entry in self.tail_log_file(most_recent_file, follow=True):
yield entry
# Use appropriate tailer based on file type
if most_recent_file.suffix == ".json":
async for entry in self.tail_json_log_file(most_recent_file, follow=True):
yield entry
else:
async for entry in self.tail_log_file(most_recent_file, follow=True):
yield entry


class LogFilter:
Expand Down
101 changes: 100 additions & 1 deletion webhook_server/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,11 @@ def test_to_dict_returns_correct_structure(self, mock_datetime):
assert result["initial_rate_limit"] == 5000
assert result["final_rate_limit"] == 4985

# Status
# Level and status
assert result["level"] == "COMPLETED"
assert result["status"] == "success"

# Success
assert result["success"] is True
assert result["error"] is None

Expand Down Expand Up @@ -391,11 +395,106 @@ def test_to_dict_with_error(self, mock_datetime):

result = ctx.to_dict()

assert result["level"] == "COMPLETED"
assert result["status"] == "failed"
assert result["success"] is False
assert result["error"] is not None
assert result["error"]["type"] == "ValueError"
assert result["error"]["message"] == "Something went wrong"

def test_derive_level_always_returns_completed(self):
"""Test _derive_level always returns COMPLETED regardless of step or context state."""
ctx = WebhookContext(
hook_id="hook-level-1",
event_type="pull_request",
repository="owner/repo",
repository_full_name="owner/repo",
)
# Default state (success=True, no steps)
assert ctx._derive_level() == "COMPLETED"

# With a failed step
ctx.workflow_steps["some_step"] = {"status": "failed"}
assert ctx._derive_level() == "COMPLETED"

# With success=False
ctx.success = False
assert ctx._derive_level() == "COMPLETED"

# With can_merge=False step
ctx.workflow_steps["merge_check"] = {"status": "completed", "can_merge": False}
assert ctx._derive_level() == "COMPLETED"

def test_derive_status_returns_success_when_all_ok(self):
"""Test _derive_status returns 'success' when overall success and all steps succeeded."""
ctx = WebhookContext(
hook_id="hook-status-1",
event_type="pull_request",
repository="owner/repo",
repository_full_name="owner/repo",
)
ctx.workflow_steps["step1"] = {"status": "completed", "can_merge": True}
ctx.workflow_steps["step2"] = {"status": "completed", "success": True}
assert ctx._derive_status() == "success"

def test_derive_status_returns_success_with_no_steps(self):
"""Test _derive_status returns 'success' when there are no workflow steps."""
ctx = WebhookContext(
hook_id="hook-status-2",
event_type="push",
repository="owner/repo",
repository_full_name="owner/repo",
)
assert ctx._derive_status() == "success"

def test_derive_status_returns_failed_when_success_is_false(self):
"""Test _derive_status returns 'failed' when overall success is False."""
ctx = WebhookContext(
hook_id="hook-status-3",
event_type="pull_request",
repository="owner/repo",
repository_full_name="owner/repo",
)
ctx.success = False
assert ctx._derive_status() == "failed"

def test_derive_status_returns_partial_when_step_failed(self):
"""Test _derive_status returns 'partial' when webhook succeeded but a step failed."""
ctx = WebhookContext(
hook_id="hook-status-4",
event_type="pull_request",
repository="owner/repo",
repository_full_name="owner/repo",
)
ctx.workflow_steps["good_step"] = {"status": "completed", "success": True}
ctx.workflow_steps["bad_step"] = {"status": "failed"}
assert ctx._derive_status() == "partial"

def test_derive_status_returns_partial_when_completed_step_not_successful(self):
"""Test _derive_status returns 'partial' when a completed step has failure indicators."""
ctx = WebhookContext(
hook_id="hook-status-5",
event_type="pull_request",
repository="owner/repo",
repository_full_name="owner/repo",
)
ctx.workflow_steps["merge_check"] = {"status": "completed", "can_merge": False}
assert ctx._derive_status() == "partial"

def test_derive_status_returns_partial_with_error_in_completed_step(self):
"""Test _derive_status returns 'partial' when completed step has error field."""
ctx = WebhookContext(
hook_id="hook-status-6",
event_type="pull_request",
repository="owner/repo",
repository_full_name="owner/repo",
)
ctx.workflow_steps["step_with_error"] = {
"status": "completed",
"error": "something went wrong",
}
assert ctx._derive_status() == "partial"


class TestContextManagement:
"""Tests for module-level context management functions."""
Expand Down
Loading