From fb851652d029081e58b39388500340fbd7cdb308 Mon Sep 17 00:00:00 2001 From: John Ajera Date: Tue, 11 Nov 2025 23:13:32 +1300 Subject: [PATCH] feat: initial commit first release --- .github/workflows/commitmsg-conform.yml | 11 + .github/workflows/markdown-lint.yml | 14 + .gitignore | 42 ++ README.md | 352 +++++++++ config/log_sources.yaml.example | 62 ++ dashboard/dashboard.py | 530 +++++++++++++ requirements.txt | 7 + scripts/analyze_logs.py | 624 +++++++++++++++ scripts/parse_logs.py | 347 +++++++++ scripts/query_logs.py | 256 +++++++ scripts/sync_logs.py | 145 ++++ src/__init__.py | 4 + src/analyze/__init__.py | 32 + src/analyze/analytics.py | 961 ++++++++++++++++++++++++ src/parse/__init__.py | 8 + src/parse/log_parser.py | 342 +++++++++ src/sync/__init__.py | 10 + src/sync/base.py | 48 ++ src/sync/s3_sync.py | 233 ++++++ src/sync/sync_manager.py | 71 ++ src/utils/__init__.py | 9 + src/utils/config_loader.py | 51 ++ src/utils/date_utils.py | 61 ++ 23 files changed, 4220 insertions(+) create mode 100644 .github/workflows/commitmsg-conform.yml create mode 100644 .github/workflows/markdown-lint.yml create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config/log_sources.yaml.example create mode 100644 dashboard/dashboard.py create mode 100644 requirements.txt create mode 100755 scripts/analyze_logs.py create mode 100755 scripts/parse_logs.py create mode 100755 scripts/query_logs.py create mode 100755 scripts/sync_logs.py create mode 100644 src/__init__.py create mode 100644 src/analyze/__init__.py create mode 100755 src/analyze/analytics.py create mode 100644 src/parse/__init__.py create mode 100755 src/parse/log_parser.py create mode 100644 src/sync/__init__.py create mode 100644 src/sync/base.py create mode 100644 src/sync/s3_sync.py create mode 100644 src/sync/sync_manager.py create mode 100644 src/utils/__init__.py create mode 100644 src/utils/config_loader.py create mode 100644 src/utils/date_utils.py diff --git a/.github/workflows/commitmsg-conform.yml b/.github/workflows/commitmsg-conform.yml new file mode 100644 index 0000000..8af1d71 --- /dev/null +++ b/.github/workflows/commitmsg-conform.yml @@ -0,0 +1,11 @@ +name: Commit Message Conformance +on: + pull_request: {} +permissions: + statuses: write + checks: write + contents: read + pull-requests: read +jobs: + commitmsg-conform: + uses: actionsforge/actions/.github/workflows/commitmsg-conform.yml@main diff --git a/.github/workflows/markdown-lint.yml b/.github/workflows/markdown-lint.yml new file mode 100644 index 0000000..034b809 --- /dev/null +++ b/.github/workflows/markdown-lint.yml @@ -0,0 +1,14 @@ +name: Markdown Lint + +on: + pull_request: {} + +permissions: + statuses: write + checks: write + contents: read + pull-requests: read + +jobs: + markdown-lint: + uses: actionsforge/actions/.github/workflows/markdown-lint.yml@main diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..360b974 --- /dev/null +++ b/.gitignore @@ -0,0 +1,42 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python + +# Virtual environments +venv/ +env/ +ENV/ + +# Logs and data +logs/ +*.log +*.log.gz + +# Configuration (contains sensitive/instance-specific data) +config/log_sources.yaml + +# Parsed data (optional - uncomment if you don't want to commit parsed logs) +# logs/parsed_logs.json +# logs/parsed_logs.csv + +# Analytics output (optional - uncomment if you don't want to commit analytics) +# logs/analytics.json + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Temporary files +*.tmp +*.bak + diff --git a/README.md b/README.md new file mode 100644 index 0000000..098a4b6 --- /dev/null +++ b/README.md @@ -0,0 +1,352 @@ +# Fastly Log Sync and Analytics + +A comprehensive tool for syncing Fastly logs from S3, parsing them, and generating analytics reports. + +## ⚠️ Security Notice + +**This tool processes log files that may contain sensitive information including:** + +- IP addresses +- User agents +- Request paths and query parameters +- Response data +- Other potentially sensitive application data + +**Please ensure:** + +- Log files are stored securely and access is restricted +- Parsed log files are not committed to version control +- Configuration files with real bucket names/paths are gitignored +- Access to the dashboard and analytics is restricted to authorized personnel only + +## Features + +- **Date-filtered S3 sync**: Download logs for specific date ranges +- **Incremental sync**: Only downloads new files that don't already exist locally +- **UTC timezone handling**: All date operations use UTC to match log file organization +- **Log parsing**: Converts syslog-style Fastly logs to structured JSON/CSV +- **Comprehensive analytics**: Traffic patterns, error analysis, performance metrics, user agent analysis, query parameter patterns, endpoint drill-down, daily summaries, and slowness investigation +- **Interactive dashboard**: Streamlit-based web dashboard for visualizing log analytics +- **Modular design**: Run sync, parse, or analyze operations independently or as a pipeline + +## Prerequisites + +- **AWS CLI**: Required for S3 sync operations + - Install: [https://aws.amazon.com/cli/](https://aws.amazon.com/cli/) + - Configure credentials: `aws configure` or set environment variables +- **Python 3**: Required for parsing and analytics +- **Python packages**: Install with `pip install -r requirements.txt` + +## Installation + +1. Clone or download this repository +2. Install Python dependencies: + + ```bash + pip install -r requirements.txt + ``` + +3. Create your configuration file: + + ```bash + cp config/log_sources.yaml.example config/log_sources.yaml + ``` + + Then edit `config/log_sources.yaml` with your actual log source configurations (S3 buckets, paths, etc.). This file is gitignored and won't be committed. + +4. Ensure AWS CLI is installed and configured with appropriate credentials + +## Usage + +### Full Pipeline (Sync → Parse → Analyze) + +Run all operations in sequence: + +**From date to today** (syncs logs from the specified date to today, inclusive): + +```bash +python3 scripts/query_logs.py --date 2025-11-10 +``` + +**Date range** (syncs logs from start date to end date, inclusive): + +```bash +python3 scripts/query_logs.py --start-date 2025-11-10 --end-date 2025-11-12 +``` + +**Note**: The `--date` parameter syncs logs from the specified date **to today** (in UTC, since logs are stored in UTC). To sync a specific date range, use `--start-date` and `--end-date`. + +**Timezone Note**: All date operations use UTC to match the log file organization in S3. Log timestamps are preserved in UTC throughout parsing and analysis. + +### Individual Operations + +#### Sync Only + +Download logs from S3 without parsing or analyzing: + +```bash +python3 scripts/query_logs.py --operation sync --start-date 2025-11-10 --end-date 2025-11-12 +``` + +Or use the sync script directly: + +```bash +python3 scripts/sync_logs.py --start-date 2025-11-10 --end-date 2025-11-12 +``` + +#### Parse Only + +Parse existing log files in the `logs/` directory: + +```bash +python3 scripts/query_logs.py --operation parse +``` + +Or use the parser directly: + +```bash +python3 scripts/parse_logs.py --input-dir ./logs/example-source-1/raw --output ./logs/example-source-1/parsed/parsed_logs.json +``` + +#### Analyze Only + +Generate analytics from already-parsed logs: + +```bash +python3 scripts/query_logs.py --operation analyze --parsed-output ./logs/example-source-1/parsed/parsed_logs.json +``` + +Or use the analyzer directly: + +```bash +python3 scripts/analyze_logs.py --input ./logs/example-source-1/parsed/parsed_logs.json --format console +``` + +#### Interactive Dashboard + +Launch the Streamlit dashboard for interactive visualization: + +```bash +streamlit run dashboard/dashboard.py +``` + +The dashboard will open in your browser (typically at `http://localhost:8501`) and provides: + +- **Traffic Patterns**: Requests over time, popular endpoints, HTTP method distribution +- **Error Analysis**: Status code breakdown, error rates, error-prone endpoints +- **Performance Metrics**: Cache hit/miss rates, response size statistics +- **User Agent Analysis**: Top user agents and agent type distribution +- **Query Patterns**: Most common query parameters and value distributions +- **Slowness Investigation**: Cache miss patterns, large response endpoints, peak traffic times + +You can specify a custom parsed log file path in the dashboard sidebar. + +## Command-Line Options + +### query_logs.py + +- `--start-date DATE`: Start date in YYYY-MM-DD format (required for sync) +- `--end-date DATE`: End date in YYYY-MM-DD format (required for sync) +- `--date DATE`: Start date in YYYY-MM-DD format (syncs from this date to today) +- `--operation OP`: Operation to perform: `sync`, `parse`, `analyze`, or `all` (default: `all`) +- `--parsed-output FILE`: Output file for parsed logs (default: first enabled source's parsed output) +- `--analytics-output FILE`: Output file for analytics report (optional) + +### sync_logs.sh + +- `--start-date DATE`: Start date in YYYY-MM-DD format +- `--end-date DATE`: End date in YYYY-MM-DD format +- `--date DATE`: Start date in YYYY-MM-DD format (syncs from this date to today) +- `-h, --help`: Show help message + +### parse_logs.py + +- `--input-dir DIR`: Directory containing log files (default: `./logs/example-source-1/raw`) +- `--output FILE`: Output file path (default: `./logs/example-source-1/parsed/parsed_logs.json`) +- `--format FORMAT`: Output format: `json` or `csv` (default: `json`) +- `--pattern PATTERN`: File pattern to match (default: `*.log*`) + +### analyze_logs.py + +- `--input FILE`: Input file (parsed JSON or CSV) - **required** +- `--output FILE`: Output file path (optional) +- `--format FORMAT`: Output format: `json` or `console` (default: `console`) + +## Log Format + +The tool parses Fastly logs in syslog format. All timestamps are in UTC (indicated by the 'Z' suffix): + +```plaintext +timestamp cache-server process[pid]: IP "-" "-" date "METHOD path" status size "-" "user-agent" cache-status +``` + +Example: + +```plaintext +<134>2025-11-09T23:57:35Z cache-server-001 s3logsprod[254840]: 192.0.2.1 "-" "-" Sun, 09 Nov 2025 23:57:35 GMT "GET /api/endpoint?param=value" 200 18508 "-" "Mozilla/5.0..." hit +``` + +**Note**: The example above uses dummy IP addresses (192.0.2.1 is a reserved test IP) and generic paths. Real log files will contain actual client IPs and request paths. + +## Output Structure + +### Parsed Logs (JSON) + +Each log entry is parsed into structured fields: + +```json +{ + "priority": 134, + "timestamp": "2025-11-09T23:57:35", + "cache_server": "cache-server-001", + "ip_address": "192.0.2.1", + "http_method": "GET", + "path": "/api/endpoint", + "query_string": "param=value", + "query_params": { "param": "value" }, + "status_code": 200, + "response_size": 18508, + "user_agent": "Mozilla/5.0...", + "cache_status": "hit" +} +``` + +**Note**: The example above uses dummy data. Real parsed logs will contain actual IP addresses, paths, and query parameters from your log files. + +### Analytics Report + +The analytics report includes: + +- **Traffic Patterns**: Total requests, requests per hour/day/minute, popular endpoints, HTTP method distribution, peak traffic detection +- **Error Analysis**: Status code distribution, 4xx/5xx error rates, error-prone endpoints, hourly error breakdowns +- **Performance Metrics**: Cache hit/miss rates, response size statistics, hourly cache performance, hourly response size trends +- **User Agent Analysis**: Top user agents, agent type distribution +- **Query Patterns**: Most common query parameters, parameter value distributions, top query signatures +- **Slowness Investigation**: Traffic spikes, cache miss patterns, large response endpoints, peak traffic times, rate of change analysis +- **Endpoint Drill-Down**: Detailed analysis for specific endpoints (time patterns, errors, cache, query params) +- **Daily Summaries**: Daily request totals with status code breakdown by day + +## File Structure + +```plaintext +fastly_log_query/ +├── config/ +│ ├── log_sources.yaml.example # Example configuration template +│ └── log_sources.yaml # Log source configurations (gitignored, copy from .example) +├── logs/ # Local log storage (created automatically, gitignored) +│ └── example-source-1/ +│ ├── raw/ # Raw log files from S3 (gitignored) +│ └── parsed/ # Parsed log files (JSON/CSV, gitignored) +├── dashboard/ +│ └── dashboard.py # Streamlit interactive dashboard +├── scripts/ +│ ├── query_logs.py # Main orchestration script +│ ├── sync_logs.py # S3 sync script +│ ├── parse_logs.py # Log parser +│ └── analyze_logs.py # Analytics engine +├── src/ # Source code modules +│ ├── sync/ # Sync implementations +│ ├── parse/ # Parsing logic +│ ├── analyze/ # Analysis functions +│ └── utils/ # Utility functions +├── requirements.txt # Python dependencies +└── README.md # This file +``` + +## Troubleshooting + +### AWS Credentials Not Configured + +If you see an error about AWS credentials: + +```bash +aws configure +``` + +Or set environment variables: + +```bash +export AWS_ACCESS_KEY_ID=your_key +export AWS_SECRET_ACCESS_KEY=your_secret +export AWS_DEFAULT_REGION=ap-southeast-2 +``` + +### Python Dependencies Missing + +Install required packages: + +```bash +pip install -r requirements.txt +``` + +### No Log Files Found + +Ensure: + +1. The sync operation completed successfully +2. Log files are in the `logs/` directory +3. Files match the expected naming pattern (e.g., `2025-11-10T00:00:00.000--*.log.gz`) + +### Security and Privacy + +**Important considerations when working with log data:** + +1. **Log files contain sensitive data**: IP addresses, user agents, request paths, and potentially sensitive query parameters +2. **Storage**: Ensure log files are stored securely with appropriate access controls +3. **Version control**: Never commit log files, parsed logs, or configuration files with real credentials/bucket names to git +4. **Access control**: Restrict access to the dashboard and analytics output to authorized personnel only +5. **Data retention**: Consider implementing data retention policies for log files +6. **Compliance**: Ensure log processing complies with your organization's data privacy policies and regulations + +The `.gitignore` file is configured to exclude: + +- `config/log_sources.yaml` (your actual configuration) +- `logs/` directory (all log files) +- Parsed log outputs + +## Examples + +### Example 1: Analyze logs from a date to today + +```bash +python3 scripts/query_logs.py --date 2025-11-10 # Syncs from 2025-11-10 to today +``` + +### Example 2: Sync logs for a week, then analyze separately + +```bash +# Sync +python3 scripts/query_logs.py --operation sync --start-date 2025-11-10 --end-date 2025-11-16 + +# Parse +python3 scripts/query_logs.py --operation parse + +# Analyze with custom output +python3 scripts/query_logs.py --operation analyze --analytics-output ./reports/weekly_report.json +``` + +### Example 3: Parse and analyze existing logs + +```bash +# Parse +python3 scripts/parse_logs.py --input-dir ./logs/example-source-1/raw --output ./logs/example-source-1/parsed/parsed.json --format json + +# Analyze +python3 scripts/analyze_logs.py --input ./logs/example-source-1/parsed/parsed.json --format console --output ./reports/analysis.txt +``` + +### Example 4: Use the interactive dashboard + +```bash +# First, parse your logs (if not already done) +python3 scripts/parse_logs.py --input-dir ./logs/example-source-1/raw --output ./logs/example-source-1/parsed/parsed_logs.json + +# Then launch the dashboard +streamlit run dashboard/dashboard.py +``` + +The dashboard will automatically load the parsed logs and display interactive visualizations. You can change the log file path in the sidebar if needed. + +## License + +This tool is provided as-is for internal use. diff --git a/config/log_sources.yaml.example b/config/log_sources.yaml.example new file mode 100644 index 0000000..347abfa --- /dev/null +++ b/config/log_sources.yaml.example @@ -0,0 +1,62 @@ +# Log Sources Configuration +# Define all log sources that can be synced +# +# Copy this file to log_sources.yaml and configure with your actual values +# log_sources.yaml is gitignored and will not be committed + +log_sources: + example-source-1: + type: s3 + enabled: true + description: "Example log source 1" + s3_bucket: "s3://your-bucket-name/path/to/logs" + local_dir: "logs/example-source-1/raw" + parsed_dir: "logs/example-source-1/parsed" + credentials: + # Use AWS credentials from environment/aws-vault + # Optional: specify AWS profile name + profile: null # Set to "prod" if using aws-vault or named profiles + example-source-2: + type: s3 + enabled: true + description: "Example log source 2" + s3_bucket: "s3://your-bucket-name/path/to/other-logs" + local_dir: "logs/example-source-2/raw" + parsed_dir: "logs/example-source-2/parsed" + credentials: + # Use AWS credentials from environment/aws-vault + # Optional: specify AWS profile name + profile: null # Set to "prod" if using aws-vault or named profiles + example-source-3: + type: s3 + enabled: false + description: "Example disabled source" + s3_bucket: "s3://your-bucket-name/path/to/disabled-logs" + local_dir: "logs/example-source-3/raw" + parsed_dir: "logs/example-source-3/parsed" + credentials: + # Use AWS credentials from environment/aws-vault + # Optional: specify AWS profile name + profile: null # Set to "prod" if using aws-vault or named profiles + + + # Example: Add more sources as needed + # staging-logs: + # type: s3 + # enabled: false + # description: "Staging environment logs" + # s3_bucket: "s3://staging-bucket/logs/" + # local_dir: "logs/staging/raw" + # parsed_dir: "logs/staging/parsed" + # credentials: + # profile: "staging" + + # production-logs: + # type: s3 + # enabled: false + # description: "Production environment logs" + # s3_bucket: "s3://prod-bucket/logs/" + # local_dir: "logs/production/raw" + # parsed_dir: "logs/production/parsed" + # credentials: + # profile: "production" diff --git a/dashboard/dashboard.py b/dashboard/dashboard.py new file mode 100644 index 0000000..d621213 --- /dev/null +++ b/dashboard/dashboard.py @@ -0,0 +1,530 @@ +#!/usr/bin/env python3 +""" +Fastly Log Analytics Dashboard +Interactive Streamlit dashboard for analyzing Fastly log data. +""" + +import sys +from pathlib import Path + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +import streamlit as st +import pandas as pd +import plotly.express as px +import plotly.graph_objects as go +from datetime import datetime +from typing import Dict, List + +from src.analyze.analytics import ( + load_data, + analyze_traffic_patterns, + analyze_errors, + analyze_performance, + analyze_user_agents, + analyze_query_patterns, + analyze_slowness_patterns +) +from src.utils.config_loader import load_config, get_enabled_sources + +# Page configuration +st.set_page_config( + page_title="Fastly Log Analytics", + page_icon="📊", + layout="wide", + initial_sidebar_state="expanded" +) + +# Custom CSS for better styling +st.markdown(""" + +""", unsafe_allow_html=True) + + +@st.cache_data +def load_log_data(file_path: Path) -> List[Dict]: + """Load and cache log data.""" + return load_data(file_path) + + +def format_number(num: int) -> str: + """Format large numbers with commas.""" + return f"{num:,}" + + +def create_time_series_chart(data: Dict[str, int], title: str, x_label: str = "Time", y_label: str = "Requests"): + """Create a time series chart from data.""" + if not data: + return None + + # Convert keys to datetime if possible + try: + times = [datetime.fromisoformat(k.replace('Z', '+00:00')) for k in data.keys()] + df = pd.DataFrame({'time': times, 'count': list(data.values())}) + df = df.sort_values('time') + except: + # If datetime conversion fails, use as-is + df = pd.DataFrame({'time': list(data.keys()), 'count': list(data.values())}) + + fig = px.line(df, x='time', y='count', title=title) + fig.update_layout( + xaxis_title=x_label, + yaxis_title=y_label, + hovermode='x unified', + height=400 + ) + return fig + + +def create_bar_chart(data: Dict, title: str, x_label: str = "Item", y_label: str = "Count", limit: int = 20): + """Create a bar chart from data.""" + if not data: + return None + + # Sort by value and limit + sorted_data = dict(sorted(data.items(), key=lambda x: x[1], reverse=True)[:limit]) + + df = pd.DataFrame({ + x_label: list(sorted_data.keys()), + y_label: list(sorted_data.values()) + }) + + fig = px.bar(df, x=x_label, y=y_label, title=title) + fig.update_layout( + xaxis_title=x_label, + yaxis_title=y_label, + height=400, + xaxis={'categoryorder': 'total descending'} + ) + return fig + + +def create_pie_chart(data: Dict, title: str): + """Create a pie chart from data.""" + if not data: + return None + + df = pd.DataFrame({ + 'label': list(data.keys()), + 'value': list(data.values()) + }) + + fig = px.pie(df, values='value', names='label', title=title) + fig.update_layout(height=400) + return fig + + +def main(): + st.title("📊 Fastly Log Analytics Dashboard") + + # Sidebar for file selection + st.sidebar.header("Configuration") + + # Get enabled sources from config + try: + sources = load_config() + enabled_sources = get_enabled_sources(sources) + + # Build list of available parsed log files + available_files = {} + for name, config in enabled_sources.items(): + parsed_dir = config.get('parsed_dir', f"logs/{name}/parsed") + output_file = f"{parsed_dir}/parsed_logs.json" + available_files[name] = output_file + + # Source selection + if available_files: + selected_source = st.sidebar.selectbox( + "Select Log Source", + options=list(available_files.keys()), + help="Choose which log source to analyze" + ) + default_path = available_files[selected_source] + else: + # Fallback if no sources configured + default_path = "logs/srv_quakesearch-fastly/parsed/parsed_logs.json" + selected_source = None + except Exception as e: + st.sidebar.warning(f"⚠️ Could not load config: {e}") + default_path = "logs/srv_quakesearch-fastly/parsed/parsed_logs.json" + selected_source = None + + # Allow custom path override + custom_path = st.sidebar.text_input( + "Custom Parsed Logs File Path (optional)", + value="", + help="Override with a custom path to a parsed JSON log file" + ) + + # Use custom path if provided, otherwise use selected source + if custom_path and custom_path.strip(): + log_file_path = Path(custom_path.strip()) + else: + log_file_path = Path(default_path) + + if not log_file_path.exists(): + st.error(f"❌ Log file not found: {log_file_path}") + st.info("💡 Make sure you've parsed your logs first using `python3 scripts/parse_logs.py`") + return + + # Load data + with st.spinner("Loading log data... This may take a moment for large files."): + try: + entries = load_log_data(log_file_path) + st.sidebar.success(f"✅ Loaded {format_number(len(entries))} log entries") + except Exception as e: + st.error(f"❌ Error loading data: {e}") + return + + if not entries: + st.warning("⚠️ No log entries found in the file.") + return + + # Generate analytics + with st.spinner("Generating analytics..."): + analytics = { + 'traffic': analyze_traffic_patterns(entries), + 'errors': analyze_errors(entries), + 'performance': analyze_performance(entries), + 'user_agents': analyze_user_agents(entries), + 'query_patterns': analyze_query_patterns(entries), + 'slowness': analyze_slowness_patterns(entries) + } + + # Main metrics row + st.markdown("---") + col1, col2, col3, col4 = st.columns(4) + + with col1: + st.metric("Total Requests", format_number(analytics['traffic'].get('total_requests', 0))) + + with col2: + error_rate = analytics['errors'].get('total_error_rate', 0) + st.metric("Error Rate", f"{error_rate:.2f}%") + + with col3: + cache_hit_rate = analytics['performance'].get('cache_hit_rate', 0) + st.metric("Cache Hit Rate", f"{cache_hit_rate:.2f}%") + + with col4: + error_4xx = analytics['errors'].get('error_4xx_count', 0) + error_5xx = analytics['errors'].get('error_5xx_count', 0) + st.metric("4xx/5xx Errors", f"{error_4xx}/{error_5xx}") + + st.markdown("---") + + # Tabs for different analytics sections + tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs([ + "📈 Traffic", "❌ Errors", "⚡ Performance", "👤 User Agents", "🔍 Query Patterns", "🐌 Slowness" + ]) + + # Tab 1: Traffic Patterns + with tab1: + st.header("Traffic Patterns") + + col1, col2 = st.columns(2) + + with col1: + # Requests per day + if analytics['traffic'].get('requests_per_day'): + fig = create_time_series_chart( + analytics['traffic']['requests_per_day'], + "Requests Per Day" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + with col2: + # Requests per hour + if analytics['traffic'].get('requests_per_hour'): + fig = create_time_series_chart( + analytics['traffic']['requests_per_hour'], + "Requests Per Hour" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + # Popular endpoints + if analytics['traffic'].get('popular_endpoints'): + st.subheader("Top Endpoints") + fig = create_bar_chart( + analytics['traffic']['popular_endpoints'], + "Most Requested Endpoints", + "Endpoint", + "Requests" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + # HTTP methods + if analytics['traffic'].get('http_methods'): + st.subheader("HTTP Method Distribution") + col1, col2 = st.columns([2, 1]) + with col1: + fig = create_pie_chart( + analytics['traffic']['http_methods'], + "HTTP Methods" + ) + if fig: + st.plotly_chart(fig, width='stretch') + with col2: + st.dataframe( + pd.DataFrame({ + 'Method': list(analytics['traffic']['http_methods'].keys()), + 'Count': list(analytics['traffic']['http_methods'].values()) + }), + width='stretch' + ) + + # Tab 2: Error Analysis + with tab2: + st.header("Error Analysis") + + col1, col2 = st.columns(2) + + with col1: + st.subheader("Status Code Distribution") + if analytics['errors'].get('status_code_distribution'): + fig = create_bar_chart( + analytics['errors']['status_code_distribution'], + "HTTP Status Codes", + "Status Code", + "Count" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + with col2: + st.subheader("Error Rates") + error_data = { + '4xx Errors': analytics['errors'].get('error_4xx_rate', 0), + '5xx Errors': analytics['errors'].get('error_5xx_rate', 0), + 'Success (2xx/3xx)': 100 - analytics['errors'].get('total_error_rate', 0) + } + fig = create_pie_chart(error_data, "Error Rate Breakdown") + if fig: + st.plotly_chart(fig, width='stretch') + + st.metric("4xx Error Rate", f"{analytics['errors'].get('error_4xx_rate', 0):.2f}%") + st.metric("5xx Error Rate", f"{analytics['errors'].get('error_5xx_rate', 0):.2f}%") + + # Error-prone endpoints + if analytics['errors'].get('error_endpoints'): + st.subheader("Most Error-Prone Endpoints") + fig = create_bar_chart( + analytics['errors']['error_endpoints'], + "Endpoints with Most Errors", + "Endpoint", + "Error Count" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + # Tab 3: Performance + with tab3: + st.header("Performance Metrics") + + col1, col2 = st.columns(2) + + with col1: + st.subheader("Cache Statistics") + cache_stats = analytics['performance'].get('cache_statistics', {}) + if cache_stats: + fig = create_pie_chart(cache_stats, "Cache Status Distribution") + if fig: + st.plotly_chart(fig, width='stretch') + + st.metric("Cache Hit Rate", f"{analytics['performance'].get('cache_hit_rate', 0):.2f}%") + st.metric("Cache Miss Rate", f"{analytics['performance'].get('cache_miss_rate', 0):.2f}%") + + with col2: + st.subheader("Response Size Statistics") + size_stats = analytics['performance'].get('response_size_statistics', {}) + if size_stats: + metrics_data = { + 'Mean': f"{size_stats.get('mean', 0):,.0f} bytes", + 'Median': f"{size_stats.get('median', 0):,.0f} bytes", + 'P95': f"{size_stats.get('p95', 0):,.0f} bytes", + 'P99': f"{size_stats.get('p99', 0):,.0f} bytes", + 'Min': f"{size_stats.get('min', 0):,} bytes", + 'Max': f"{size_stats.get('max', 0):,} bytes" + } + for label, value in metrics_data.items(): + st.metric(label, value) + + # Top endpoints by response size + if analytics['performance'].get('top_endpoints_by_size'): + st.subheader("Endpoints with Largest Response Sizes") + endpoints_data = analytics['performance']['top_endpoints_by_size'] + df = pd.DataFrame([ + { + 'Endpoint': endpoint, + 'Mean Size (bytes)': int(data['mean_size']), + 'Request Count': int(data['request_count']) + } + for endpoint, data in endpoints_data.items() + ]) + st.dataframe(df, width='stretch', hide_index=True) + + # Tab 4: User Agents + with tab4: + st.header("User Agent Analysis") + + col1, col2 = st.columns(2) + + with col1: + st.subheader("Top User Agents") + if analytics['user_agents'].get('top_user_agents'): + # Show top 10 in chart + top_10 = dict(list(analytics['user_agents']['top_user_agents'].items())[:10]) + fig = create_bar_chart( + top_10, + "Top 10 User Agents", + "User Agent", + "Requests" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + with col2: + st.subheader("Agent Type Distribution") + if analytics['user_agents'].get('agent_type_distribution'): + fig = create_pie_chart( + analytics['user_agents']['agent_type_distribution'], + "User Agent Types" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + # Full user agent list + if analytics['user_agents'].get('top_user_agents'): + st.subheader("All User Agents (Top 20)") + df = pd.DataFrame({ + 'User Agent': list(analytics['user_agents']['top_user_agents'].keys())[:20], + 'Count': list(analytics['user_agents']['top_user_agents'].values())[:20] + }) + st.dataframe(df, width='stretch', hide_index=True) + + # Tab 5: Query Patterns + with tab5: + st.header("Query Parameter Patterns") + + if analytics['query_patterns'].get('most_common_parameters'): + st.subheader("Most Common Query Parameters") + fig = create_bar_chart( + analytics['query_patterns']['most_common_parameters'], + "Query Parameters Usage", + "Parameter", + "Occurrences" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + # Parameter value distributions + if analytics['query_patterns'].get('parameter_value_distributions'): + st.subheader("Parameter Value Distributions") + param_values = analytics['query_patterns']['parameter_value_distributions'] + + selected_param = st.selectbox( + "Select Parameter", + options=list(param_values.keys()) + ) + + if selected_param: + fig = create_bar_chart( + param_values[selected_param], + f"Value Distribution for '{selected_param}'", + "Value", + "Count" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + # Tab 6: Slowness Investigation + with tab6: + st.header("Slowness Investigation") + + col1, col2 = st.columns(2) + + with col1: + if analytics['slowness'].get('requests_by_hour'): + st.subheader("Requests by Hour") + # Convert to proper format for chart + hour_data = {f"{k:02d}:00": v for k, v in analytics['slowness']['requests_by_hour'].items()} + fig = create_bar_chart( + hour_data, + "Request Volume by Hour", + "Hour", + "Requests" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + with col2: + if analytics['slowness'].get('requests_by_day_of_week'): + st.subheader("Requests by Day of Week") + fig = create_bar_chart( + analytics['slowness']['requests_by_day_of_week'], + "Request Volume by Day", + "Day", + "Requests" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + # High cache miss endpoints + if analytics['slowness'].get('high_cache_miss_endpoints'): + st.subheader("Endpoints with High Cache Miss Rates") + miss_data = analytics['slowness']['high_cache_miss_endpoints'] + df = pd.DataFrame({ + 'Endpoint': list(miss_data.keys()), + 'Cache Miss Rate (%)': [f"{v:.2f}" for v in miss_data.values()] + }) + st.dataframe(df, width='stretch', hide_index=True) + + # Large response endpoints + if analytics['slowness'].get('large_response_endpoints'): + st.subheader("Endpoints with Largest Response Sizes") + large_resp = analytics['slowness']['large_response_endpoints'] + df = pd.DataFrame([ + { + 'Endpoint': endpoint, + 'Mean Size (bytes)': int(data['mean_size']), + 'Max Size (bytes)': int(data['max_size']), + 'Request Count': int(data['request_count']) + } + for endpoint, data in large_resp.items() + ]) + st.dataframe(df, width='stretch', hide_index=True) + + # Cache miss rate by hour + if analytics['slowness'].get('cache_miss_rate_by_hour'): + st.subheader("Cache Miss Rate by Hour") + miss_by_hour = {f"{k:02d}:00": v for k, v in analytics['slowness']['cache_miss_rate_by_hour'].items()} + fig = create_bar_chart( + miss_by_hour, + "Cache Miss Rate by Hour", + "Hour", + "Miss Rate (%)" + ) + if fig: + st.plotly_chart(fig, width='stretch') + + +if __name__ == "__main__": + main() + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..18b8fe4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +pandas>=2.0.0 +numpy>=1.24.0 +boto3>=1.28.0 +ipwhois>=1.2.0 +pyyaml>=6.0.0 +streamlit>=1.28.0 +plotly>=5.17.0 diff --git a/scripts/analyze_logs.py b/scripts/analyze_logs.py new file mode 100755 index 0000000..55114c7 --- /dev/null +++ b/scripts/analyze_logs.py @@ -0,0 +1,624 @@ +#!/usr/bin/env python3 +""" +Fastly Log Analytics +Generates analytics reports from parsed Fastly log data. +""" + +import json +import csv +import argparse +import sys +from pathlib import Path +from collections import Counter, defaultdict +from datetime import datetime +from typing import Dict, List, Optional +import pandas as pd +import numpy as np + + +def load_data(input_path: Path) -> List[Dict]: + """Load parsed log data from JSON or CSV file.""" + if input_path.suffix == '.json': + with open(input_path, 'r') as f: + return json.load(f) + elif input_path.suffix == '.csv': + df = pd.read_csv(input_path) + # Convert query_params string back to dict if needed + if 'query_params' in df.columns: + df['query_params'] = df['query_params'].apply( + lambda x: json.loads(x) if isinstance(x, str) else {} + ) + return df.to_dict('records') + else: + raise ValueError(f"Unsupported file format: {input_path.suffix}") + + +def analyze_traffic_patterns(entries: List[Dict]) -> Dict: + """Analyze traffic patterns.""" + if not entries: + return {} + + # Convert to DataFrame for easier analysis + df = pd.DataFrame(entries) + + # Handle missing timestamps + if 'timestamp' in df.columns: + df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') + df_with_timestamp = df[df['timestamp'].notna()] + + if len(df_with_timestamp) > 0: + # Requests per hour + df_with_timestamp['hour'] = df_with_timestamp['timestamp'].dt.floor('h') + requests_per_hour = df_with_timestamp.groupby('hour').size().to_dict() + + # Requests per day + df_with_timestamp['day'] = df_with_timestamp['timestamp'].dt.date + requests_per_day = {str(k): int(v) for k, v in df_with_timestamp.groupby('day').size().to_dict().items()} + else: + requests_per_hour = {} + requests_per_day = {} + else: + requests_per_hour = {} + requests_per_day = {} + + # Popular endpoints (handle None values) + if 'path' in df.columns: + endpoint_counts = df['path'].dropna().value_counts().head(20).to_dict() + else: + endpoint_counts = {} + + # Popular HTTP methods (handle None values) + if 'http_method' in df.columns: + method_counts = df['http_method'].dropna().value_counts().to_dict() + else: + method_counts = {} + + return { + 'total_requests': len(entries), + 'requests_per_hour': {str(k): int(v) for k, v in requests_per_hour.items()}, + 'requests_per_day': requests_per_day, + 'popular_endpoints': {k: int(v) for k, v in endpoint_counts.items()}, + 'http_methods': {k: int(v) for k, v in method_counts.items()} + } + + +def analyze_errors(entries: List[Dict]) -> Dict: + """Analyze error patterns.""" + if not entries: + return {} + + df = pd.DataFrame(entries) + + # Status code distribution (handle None values) + if 'status_code' in df.columns: + status_counts = df['status_code'].dropna().value_counts().to_dict() + df_with_status = df[df['status_code'].notna()] + + # Error rates + total = len(df_with_status) + error_4xx = len(df_with_status[df_with_status['status_code'].between(400, 499)]) + error_5xx = len(df_with_status[df_with_status['status_code'].between(500, 599)]) + + # Most common errors + error_entries = df_with_status[df_with_status['status_code'] >= 400] + if 'path' in error_entries.columns: + error_endpoints = error_entries['path'].dropna().value_counts().head(10).to_dict() + else: + error_endpoints = {} + else: + status_counts = {} + total = 0 + error_4xx = 0 + error_5xx = 0 + error_endpoints = {} + + return { + 'status_code_distribution': {str(k): int(v) for k, v in status_counts.items()}, + 'total_requests': int(total), + 'error_4xx_count': int(error_4xx), + 'error_4xx_rate': float(error_4xx / total * 100) if total > 0 else 0, + 'error_5xx_count': int(error_5xx), + 'error_5xx_rate': float(error_5xx / total * 100) if total > 0 else 0, + 'total_error_rate': float((error_4xx + error_5xx) / total * 100) if total > 0 else 0, + 'error_endpoints': {k: int(v) for k, v in error_endpoints.items()} + } + + +def analyze_performance(entries: List[Dict]) -> Dict: + """Analyze performance metrics.""" + if not entries: + return {} + + df = pd.DataFrame(entries) + + # Cache hit/miss rates (handle None values) + if 'cache_status' in df.columns: + cache_counts = df['cache_status'].dropna().value_counts().to_dict() + total = len(df[df['cache_status'].notna()]) + hit_count = cache_counts.get('hit', 0) + miss_count = cache_counts.get('miss', 0) + else: + cache_counts = {} + total = 0 + hit_count = 0 + miss_count = 0 + + # Response size statistics (handle None values) + if 'response_size' in df.columns: + df_with_size = df[df['response_size'].notna()] + if len(df_with_size) > 0: + response_size_stats = { + 'mean': float(df_with_size['response_size'].mean()), + 'median': float(df_with_size['response_size'].median()), + 'min': int(df_with_size['response_size'].min()), + 'max': int(df_with_size['response_size'].max()), + 'p95': float(df_with_size['response_size'].quantile(0.95)), + 'p99': float(df_with_size['response_size'].quantile(0.99)) + } + + # Response size by endpoint + if 'path' in df_with_size.columns: + endpoint_sizes = df_with_size.groupby('path')['response_size'].agg(['mean', 'count']).to_dict('index') + top_endpoints_by_size = dict(sorted( + endpoint_sizes.items(), + key=lambda x: x[1]['mean'], + reverse=True + )[:10]) + else: + top_endpoints_by_size = {} + else: + response_size_stats = {} + top_endpoints_by_size = {} + else: + response_size_stats = {} + top_endpoints_by_size = {} + + return { + 'cache_statistics': {k: int(v) for k, v in cache_counts.items()}, + 'cache_hit_rate': float(hit_count / total * 100) if total > 0 else 0, + 'cache_miss_rate': float(miss_count / total * 100) if total > 0 else 0, + 'response_size_statistics': response_size_stats, + 'top_endpoints_by_size': { + k: {'mean_size': float(v['mean']), 'request_count': int(v['count'])} + for k, v in top_endpoints_by_size.items() + } + } + + +def analyze_user_agents(entries: List[Dict]) -> Dict: + """Analyze user agent patterns.""" + if not entries: + return {} + + df = pd.DataFrame(entries) + + # Top user agents (handle None values) + if 'user_agent' in df.columns: + top_user_agents = df['user_agent'].dropna().value_counts().head(20).to_dict() + + # Extract browser/agent type + def extract_agent_type(ua): + # Handle None, NaN, and empty values + if pd.isna(ua) or ua is None or (isinstance(ua, str) and not ua): + return 'Unknown' + # Convert to string in case it's not already + ua_str = str(ua) if not isinstance(ua, str) else ua + ua_lower = ua_str.lower() + if 'mozilla' in ua_lower and 'firefox' in ua_lower: + return 'Firefox' + elif 'chrome' in ua_lower and 'safari' in ua_lower: + return 'Chrome' + elif 'safari' in ua_lower and 'chrome' not in ua_lower: + return 'Safari' + elif 'python-requests' in ua_lower: + return 'Python/requests' + elif 'curl' in ua_lower: + return 'curl' + elif 'datadog' in ua_lower: + return 'Datadog' + else: + return 'Other' + + df['agent_type'] = df['user_agent'].apply(extract_agent_type) + agent_type_counts = df['agent_type'].value_counts().to_dict() + else: + top_user_agents = {} + agent_type_counts = {} + + return { + 'top_user_agents': {k: int(v) for k, v in top_user_agents.items()}, + 'agent_type_distribution': {k: int(v) for k, v in agent_type_counts.items()} + } + + +def analyze_query_patterns(entries: List[Dict]) -> Dict: + """Analyze query parameter patterns.""" + if not entries: + return {} + + # Collect all query parameters + param_counts = Counter() + param_value_counts = defaultdict(Counter) + + for entry in entries: + query_params = entry.get('query_params', {}) + if isinstance(query_params, str): + try: + query_params = json.loads(query_params) + except: + query_params = {} + + for param, value in query_params.items(): + param_counts[param] += 1 + param_value_counts[param][value] += 1 + + # Most common parameters + top_params = dict(param_counts.most_common(20)) + + # Most common values for top parameters + top_param_values = {} + for param in list(top_params.keys())[:10]: + top_param_values[param] = dict(param_value_counts[param].most_common(10)) + + return { + 'most_common_parameters': {k: int(v) for k, v in top_params.items()}, + 'parameter_value_distributions': { + k: {vk: int(vv) for vk, vv in v.items()} + for k, v in top_param_values.items() + } + } + + +def analyze_slowness_patterns(entries: List[Dict]) -> Dict: + """Analyze patterns that might indicate slowness issues.""" + if not entries: + return {} + + df = pd.DataFrame(entries) + + results = {} + + # 1. Time-based patterns (when does slowness occur?) + if 'timestamp' in df.columns: + df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') + df_with_time = df[df['timestamp'].notna()].copy() + + if len(df_with_time) > 0: + df_with_time['hour'] = df_with_time['timestamp'].dt.hour + df_with_time['day_of_week'] = df_with_time['timestamp'].dt.day_name() + + # Requests per hour (identify peak times) + requests_by_hour = df_with_time.groupby('hour').size().to_dict() + results['requests_by_hour'] = {int(k): int(v) for k, v in requests_by_hour.items()} + + # Requests by day of week + requests_by_dow = df_with_time.groupby('day_of_week').size().to_dict() + results['requests_by_day_of_week'] = {k: int(v) for k, v in requests_by_dow.items()} + + # 2. Cache miss patterns (cache misses are slower) + if 'cache_status' in df.columns: + df_with_cache = df[df['cache_status'].notna()].copy() + if len(df_with_cache) > 0: + # Cache miss rate by endpoint + if 'path' in df_with_cache.columns: + cache_by_endpoint = df_with_cache.groupby('path')['cache_status'].apply( + lambda x: (x == 'miss').sum() / len(x) * 100 + ).sort_values(ascending=False).head(20).to_dict() + results['high_cache_miss_endpoints'] = {k: float(v) for k, v in cache_by_endpoint.items()} + + # Cache miss rate by hour (when are cache misses most common?) + if 'timestamp' in df_with_cache.columns: + df_with_cache['timestamp'] = pd.to_datetime(df_with_cache['timestamp'], errors='coerce') + df_with_cache = df_with_cache[df_with_cache['timestamp'].notna()] + if len(df_with_cache) > 0: + df_with_cache['hour'] = df_with_cache['timestamp'].dt.hour + cache_miss_by_hour = df_with_cache.groupby('hour').apply( + lambda x: (x['cache_status'] == 'miss').sum() / len(x) * 100 + ).to_dict() + results['cache_miss_rate_by_hour'] = {int(k): float(v) for k, v in cache_miss_by_hour.items()} + + # 3. Large response sizes (could indicate slow endpoints) + if 'response_size' in df.columns and 'path' in df.columns: + df_with_size = df[df['response_size'].notna() & df['path'].notna()].copy() + if len(df_with_size) > 0: + # Endpoints with largest average response sizes + large_responses = df_with_size.groupby('path')['response_size'].agg(['mean', 'count', 'max']).sort_values('mean', ascending=False).head(20) + results['large_response_endpoints'] = { + k: { + 'mean_size': float(v['mean']), + 'max_size': int(v['max']), + 'request_count': int(v['count']) + } + for k, v in large_responses.to_dict('index').items() + } + + # Very large responses (outliers) + p99_size = df_with_size['response_size'].quantile(0.99) + very_large = df_with_size[df_with_size['response_size'] > p99_size] + if len(very_large) > 0: + results['outlier_large_responses'] = { + 'p99_threshold': float(p99_size), + 'count': int(len(very_large)), + 'percentage': float(len(very_large) / len(df_with_size) * 100), + 'top_endpoints': very_large['path'].value_counts().head(10).to_dict() + } + + # 4. Error correlation with slowness (errors might indicate slowness) + if 'status_code' in df.columns and 'path' in df.columns: + df_with_status = df[df['status_code'].notna() & df['path'].notna()].copy() + if len(df_with_status) > 0: + # Endpoints with high error rates (might be slow/failing) + error_rates = df_with_status.groupby('path').apply( + lambda x: (x['status_code'] >= 400).sum() / len(x) * 100 + ).sort_values(ascending=False).head(20).to_dict() + results['high_error_rate_endpoints'] = {k: float(v) for k, v in error_rates.items()} + + # 5xx errors by hour (server issues might cause slowness) + if 'timestamp' in df_with_status.columns: + df_with_status['timestamp'] = pd.to_datetime(df_with_status['timestamp'], errors='coerce') + df_with_status = df_with_status[df_with_status['timestamp'].notna()] + if len(df_with_status) > 0: + df_with_status['hour'] = df_with_status['timestamp'].dt.hour + server_errors_by_hour = df_with_status[df_with_status['status_code'] >= 500].groupby('hour').size().to_dict() + results['server_errors_by_hour'] = {int(k): int(v) for k, v in server_errors_by_hour.items()} + + # 5. Query parameter patterns that might cause slowness + if 'query_params' in df.columns and 'path' in df.columns: + # Find endpoints with complex queries (many parameters) + complex_queries = [] + for entry in entries: + query_params = entry.get('query_params', {}) + if isinstance(query_params, str): + try: + query_params = json.loads(query_params) + except: + query_params = {} + + param_count = len(query_params) if query_params else 0 + if param_count > 5: # More than 5 parameters might indicate complex queries + complex_queries.append({ + 'path': entry.get('path'), + 'param_count': param_count, + 'params': list(query_params.keys()) if query_params else [] + }) + + if complex_queries: + complex_df = pd.DataFrame(complex_queries) + if len(complex_df) > 0: + complex_by_endpoint = complex_df.groupby('path')['param_count'].agg(['mean', 'max', 'count']).sort_values('mean', ascending=False).head(20) + results['complex_query_endpoints'] = { + k: { + 'avg_params': float(v['mean']), + 'max_params': int(v['max']), + 'request_count': int(v['count']) + } + for k, v in complex_by_endpoint.to_dict('index').items() + } + + # 6. IP address patterns (maybe certain IPs are causing slowness) + if 'ip_address' in df.columns: + df_with_ip = df[df['ip_address'].notna()].copy() + if len(df_with_ip) > 0: + # Top IPs by request volume (might indicate bots/crawlers causing load) + top_ips = df_with_ip['ip_address'].value_counts().head(20).to_dict() + results['top_request_ips'] = {k: int(v) for k, v in top_ips.items()} + + # 7. User agent patterns (certain clients might be slower) + if 'user_agent' in df.columns and 'response_size' in df.columns: + df_with_ua = df[df['user_agent'].notna() & df['response_size'].notna()].copy() + if len(df_with_ua) > 0: + # Average response size by user agent (some clients might get different responses) + ua_response_sizes = df_with_ua.groupby('user_agent')['response_size'].agg(['mean', 'count']).sort_values('mean', ascending=False).head(10).to_dict('index') + results['user_agent_response_sizes'] = { + k: {'mean_size': float(v['mean']), 'request_count': int(v['count'])} + for k, v in ua_response_sizes.items() + } + + return results + + +def generate_report(analytics: Dict, output_format: str, output_path: Optional[Path] = None): + """Generate and output analytics report.""" + if output_format == 'json': + output = json.dumps(analytics, indent=2) + if output_path: + with open(output_path, 'w') as f: + f.write(output) + else: + print(output) + elif output_format == 'console': + print("\n" + "="*80) + print("FASTLY LOG ANALYTICS REPORT") + print("="*80) + + # Traffic Patterns + if 'traffic' in analytics: + tp = analytics['traffic'] + print("\n## Traffic Patterns") + print(f"Total Requests: {tp.get('total_requests', 0):,}") + print(f"\nHTTP Methods:") + for method, count in tp.get('http_methods', {}).items(): + print(f" {method}: {count:,}") + print(f"\nTop 10 Endpoints:") + for endpoint, count in list(tp.get('popular_endpoints', {}).items())[:10]: + print(f" {endpoint}: {count:,}") + + # Error Analysis + if 'errors' in analytics: + err = analytics['errors'] + print("\n## Error Analysis") + print(f"Total Requests: {err.get('total_requests', 0):,}") + print(f"4xx Errors: {err.get('error_4xx_count', 0):,} ({err.get('error_4xx_rate', 0):.2f}%)") + print(f"5xx Errors: {err.get('error_5xx_count', 0):,} ({err.get('error_5xx_rate', 0):.2f}%)") + print(f"Total Error Rate: {err.get('total_error_rate', 0):.2f}%") + print(f"\nStatus Code Distribution:") + for code, count in sorted(err.get('status_code_distribution', {}).items()): + print(f" {code}: {count:,}") + + # Performance + if 'performance' in analytics: + perf = analytics['performance'] + print("\n## Performance Metrics") + print(f"Cache Hit Rate: {perf.get('cache_hit_rate', 0):.2f}%") + print(f"Cache Miss Rate: {perf.get('cache_miss_rate', 0):.2f}%") + rs = perf.get('response_size_statistics', {}) + print(f"\nResponse Size Statistics:") + print(f" Mean: {rs.get('mean', 0):.2f} bytes") + print(f" Median: {rs.get('median', 0):.2f} bytes") + print(f" P95: {rs.get('p95', 0):.2f} bytes") + print(f" P99: {rs.get('p99', 0):.2f} bytes") + + # User Agents + if 'user_agents' in analytics: + ua = analytics['user_agents'] + print("\n## User Agent Analysis") + print(f"Agent Type Distribution:") + for agent_type, count in ua.get('agent_type_distribution', {}).items(): + print(f" {agent_type}: {count:,}") + + # Query Patterns + if 'query_patterns' in analytics: + qp = analytics['query_patterns'] + print("\n## Query Parameter Analysis") + print(f"Most Common Parameters:") + for param, count in list(qp.get('most_common_parameters', {}).items())[:10]: + print(f" {param}: {count:,}") + + # Slowness Investigation + if 'slowness_investigation' in analytics: + slow = analytics['slowness_investigation'] + print("\n## Slowness Investigation") + + # Time-based patterns + if 'requests_by_hour' in slow: + print("\n### Traffic by Hour (identify peak times)") + peak_hours = sorted(slow['requests_by_hour'].items(), key=lambda x: x[1], reverse=True)[:5] + for hour, count in peak_hours: + print(f" Hour {hour:02d}:00 - {count:,} requests") + + if 'cache_miss_rate_by_hour' in slow: + print("\n### Cache Miss Rate by Hour (cache misses are slower)") + for hour in sorted(slow['cache_miss_rate_by_hour'].keys()): + rate = slow['cache_miss_rate_by_hour'][hour] + print(f" Hour {hour:02d}:00 - {rate:.1f}% cache miss rate") + + # High cache miss endpoints + if 'high_cache_miss_endpoints' in slow: + print("\n### Endpoints with High Cache Miss Rates (>50%)") + high_miss = {k: v for k, v in slow['high_cache_miss_endpoints'].items() if v > 50} + if high_miss: + for endpoint, rate in sorted(high_miss.items(), key=lambda x: x[1], reverse=True)[:10]: + print(f" {endpoint}: {rate:.1f}% miss rate") + else: + print(" (No endpoints with >50% cache miss rate)") + + # Large response sizes + if 'large_response_endpoints' in slow: + print("\n### Endpoints with Largest Average Response Sizes") + for endpoint, data in list(slow['large_response_endpoints'].items())[:10]: + size_mb = data['mean_size'] / (1024 * 1024) + print(f" {endpoint}: {size_mb:.2f} MB avg ({data['request_count']:,} requests)") + + # Outlier large responses + if 'outlier_large_responses' in slow: + outlier = slow['outlier_large_responses'] + print(f"\n### Very Large Responses (Outliers)") + print(f" P99 threshold: {outlier['p99_threshold'] / (1024*1024):.2f} MB") + print(f" Outlier count: {outlier['count']:,} ({outlier['percentage']:.2f}% of requests)") + if 'top_endpoints' in outlier and outlier['top_endpoints']: + print(f" Top endpoints with outliers:") + for endpoint, count in list(outlier['top_endpoints'].items())[:5]: + print(f" {endpoint}: {count:,}") + + # High error rate endpoints + if 'high_error_rate_endpoints' in slow: + print("\n### Endpoints with High Error Rates (might indicate slowness)") + high_errors = {k: v for k, v in slow['high_error_rate_endpoints'].items() if v > 5} + if high_errors: + for endpoint, rate in sorted(high_errors.items(), key=lambda x: x[1], reverse=True)[:10]: + print(f" {endpoint}: {rate:.1f}% error rate") + else: + print(" (No endpoints with >5% error rate)") + + # Server errors by hour + if 'server_errors_by_hour' in slow and slow['server_errors_by_hour']: + print("\n### Server Errors (5xx) by Hour") + for hour in sorted(slow['server_errors_by_hour'].keys()): + count = slow['server_errors_by_hour'][hour] + print(f" Hour {hour:02d}:00 - {count:,} server errors") + + # Complex queries + if 'complex_query_endpoints' in slow: + print("\n### Endpoints with Complex Queries (>5 parameters avg)") + for endpoint, data in list(slow['complex_query_endpoints'].items())[:10]: + print(f" {endpoint}: {data['avg_params']:.1f} avg params ({data['request_count']:,} requests)") + + # Top request IPs + if 'top_request_ips' in slow: + print("\n### Top Request IPs (might indicate bots/crawlers)") + for ip, count in list(slow['top_request_ips'].items())[:10]: + print(f" {ip}: {count:,} requests") + + print("\n" + "="*80) + + if output_path: + # Also save JSON to file + json_output = json.dumps(analytics, indent=2) + with open(output_path, 'w') as f: + f.write(json_output) + else: + raise ValueError(f"Unsupported output format: {output_format}") + + +def main(): + parser = argparse.ArgumentParser(description='Analyze parsed Fastly log data') + parser.add_argument( + '--input', + type=str, + required=True, + help='Input file (parsed JSON or CSV)' + ) + parser.add_argument( + '--output', + type=str, + help='Output file path (optional)' + ) + parser.add_argument( + '--format', + choices=['json', 'console'], + default='console', + help='Output format (default: console)' + ) + + args = parser.parse_args() + + input_path = Path(args.input) + if not input_path.exists(): + print(f"Error: Input file does not exist: {input_path}", file=sys.stderr) + sys.exit(1) + + print(f"Loading data from {input_path}...") + entries = load_data(input_path) + print(f"Loaded {len(entries):,} log entries") + + print("Generating analytics...") + analytics = { + 'traffic': analyze_traffic_patterns(entries), + 'errors': analyze_errors(entries), + 'performance': analyze_performance(entries), + 'user_agents': analyze_user_agents(entries), + 'query_patterns': analyze_query_patterns(entries), + 'slowness_investigation': analyze_slowness_patterns(entries) + } + + output_path = Path(args.output) if args.output else None + generate_report(analytics, args.format, output_path) + + if output_path: + print(f"\nReport saved to {output_path}") + print("Done!") + + +if __name__ == '__main__': + main() + diff --git a/scripts/parse_logs.py b/scripts/parse_logs.py new file mode 100755 index 0000000..5e97e7e --- /dev/null +++ b/scripts/parse_logs.py @@ -0,0 +1,347 @@ +#!/usr/bin/env python3 +""" +Fastly Log Parser +Parses syslog-style Fastly log entries and converts them to structured data. +""" + +import re +import gzip +import json +import csv +import argparse +from pathlib import Path +from typing import Dict, List, Optional +from datetime import datetime +import sys + + +# Fastly log format regex pattern +# Format: timestamp cache-server process[pid]: IP "-" "-" date "METHOD path" status size "-" "user-agent" cache-status +LOG_PATTERN = re.compile( + r'<(\d+)>' # Priority code + r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)' # Timestamp + r'\s+(\S+)' # Cache server + r'\s+(\S+)\[(\d+)\]:' # Process and PID + r'\s+(\S+)' # IP address + r'\s+"([^"]*)"' # First "-" field (usually referrer) + r'\s+"([^"]*)"' # Second "-" field + r'\s+([^"]+?)(?=\s+")' # Date string (non-greedy until next quote) + r'\s+"([A-Z]+)\s+([^"]+)"' # HTTP method and path + r'\s+(\d+)' # Status code + r'\s+(\d+)' # Response size + r'\s+"([^"]*)"' # Referrer + r'\s+"([^"]*)"' # User agent + r'\s+(\S+)' # Cache status (hit/miss) +) + + +def safe_int(value, default=None): + """Safely convert value to int, return default if fails.""" + try: + return int(value) if value else default + except (ValueError, TypeError): + return default + +def safe_get(groups, index, default=None): + """Safely get group from regex match, return default if out of bounds.""" + try: + return groups[index] if index < len(groups) and groups[index] else default + except (IndexError, TypeError): + return default + +def parse_log_line(line: str) -> Optional[Dict]: + """ + Parse a single log line and return structured data. + Truly lazy parsing - extracts whatever fields are available using individual patterns. + Doesn't rely on a fixed format - works with any log structure. + + Args: + line: Raw log line string + + Returns: + Dictionary with parsed fields or None if line is empty + """ + line = line.strip() + if not line: + return None + + # Start with raw line + result = {'raw_line': line} + + # Try LOG_PATTERN first as an optimization (faster for standard format) + match = LOG_PATTERN.match(line) + if match: + groups = match.groups() + result['priority'] = safe_int(safe_get(groups, 0)) + timestamp_str = safe_get(groups, 1) + if timestamp_str: + try: + timestamp = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%SZ') + result['timestamp'] = timestamp.isoformat() + except (ValueError, TypeError): + result['timestamp'] = None + result['cache_server'] = safe_get(groups, 2) + result['process'] = safe_get(groups, 3) + result['pid'] = safe_int(safe_get(groups, 4)) + result['ip_address'] = safe_get(groups, 5) + result['referrer1'] = safe_get(groups, 6) + result['referrer2'] = safe_get(groups, 7) + result['date_string'] = safe_get(groups, 8) + method = safe_get(groups, 9) + full_path = safe_get(groups, 10) + if full_path: + path_parts = full_path.split('?', 1) + result['path'] = path_parts[0] + result['query_string'] = path_parts[1] if len(path_parts) > 1 else None + query_params = {} + if result['query_string']: + for param in result['query_string'].split('&'): + if '=' in param: + key, value = param.split('=', 1) + query_params[key] = value + result['query_params'] = query_params + result['http_method'] = method + result['status_code'] = safe_int(safe_get(groups, 11)) + result['response_size'] = safe_int(safe_get(groups, 12)) + result['referrer'] = safe_get(groups, 13) + result['user_agent'] = safe_get(groups, 14) + result['cache_status'] = safe_get(groups, 15) + return result + + # Fallback: Extract fields individually (lazy mode - works with any format) + # Extract timestamp (ISO format with Z) + timestamp_match = re.search(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)', line) + if timestamp_match: + try: + timestamp = datetime.strptime(timestamp_match.group(1), '%Y-%m-%dT%H:%M:%SZ') + result['timestamp'] = timestamp.isoformat() + except ValueError: + pass + + # Extract priority code + priority_match = re.search(r'<(\d+)>', line) + if priority_match: + result['priority'] = safe_int(priority_match.group(1)) + + # Extract IP address + ip_match = re.search(r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b', line) + if ip_match: + result['ip_address'] = ip_match.group(1) + + # Extract HTTP method and path + http_match = re.search(r'"([A-Z]+)\s+([^"]+)"', line) + if http_match: + result['http_method'] = http_match.group(1) + full_path = http_match.group(2) + path_parts = full_path.split('?', 1) + result['path'] = path_parts[0] + result['query_string'] = path_parts[1] if len(path_parts) > 1 else None + # Parse query parameters + if result.get('query_string'): + query_params = {} + for param in result['query_string'].split('&'): + if '=' in param: + key, value = param.split('=', 1) + query_params[key] = value + result['query_params'] = query_params + else: + result['query_params'] = {} + + # Extract status code (3-digit number) + status_match = re.search(r'\s(\d{3})\s', line) + if status_match: + result['status_code'] = safe_int(status_match.group(1)) + + # Extract response size (number after status code) + size_match = re.search(r'\s(\d{3})\s+(\d+)\s', line) + if size_match: + result['response_size'] = safe_int(size_match.group(2)) + + # Extract user agent (in quotes) + ua_match = re.search(r'"([^"]*Mozilla[^"]*)"', line) + if ua_match: + result['user_agent'] = ua_match.group(1) + else: + # Try any quoted string that looks like a user agent + ua_match = re.search(r'"([^"]{20,})"', line) + if ua_match and 'Mozilla' in ua_match.group(1): + result['user_agent'] = ua_match.group(1) + + # Extract cache status (hit/miss/etc) + cache_match = re.search(r'\s(hit|miss|pass|error|synth)\s*$', line) + if cache_match: + result['cache_status'] = cache_match.group(1) + + # Extract cache server (word before process) + server_match = re.search(r'cache-([^\s]+)', line) + if server_match: + result['cache_server'] = 'cache-' + server_match.group(1) + + # Extract process[pid] + process_match = re.search(r'(\S+)\[(\d+)\]:', line) + if process_match: + result['process'] = process_match.group(1) + result['pid'] = safe_int(process_match.group(2)) + + return result + + +def process_log_file(file_path: Path): + """ + Process a log file (compressed or uncompressed) and yield parsed entries lazily. + + Args: + file_path: Path to the log file + + Yields: + Parsed log entry dictionaries + """ + try: + if file_path.suffix == '.gz': + with gzip.open(file_path, 'rt', encoding='utf-8', errors='ignore') as f: + for line_num, line in enumerate(f, 1): + entry = parse_log_line(line) + if entry: + entry['source_file'] = str(file_path) + entry['line_number'] = line_num + yield entry + else: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + for line_num, line in enumerate(f, 1): + entry = parse_log_line(line) + if entry: + entry['source_file'] = str(file_path) + entry['line_number'] = line_num + yield entry + except Exception as e: + print(f"Error processing {file_path}: {e}", file=sys.stderr) + + +def save_json_streaming(entries, output_path: Path): + """Save parsed data as JSON using streaming (memory efficient).""" + # Create directory if it doesn't exist + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, 'w') as f: + f.write('[\n') + first = True + for entry in entries: + if not first: + f.write(',\n') + json.dump(entry, f, indent=2) + first = False + f.write('\n]') + + +def save_csv_streaming(entries, output_path: Path): + """Save parsed data as CSV using streaming (memory efficient).""" + # Create directory if it doesn't exist + output_path.parent.mkdir(parents=True, exist_ok=True) + writer = None + first_entry = True + + with open(output_path, 'w', newline='') as f: + for entry in entries: + # Flatten query_params for CSV + flat_entry = entry.copy() + # Convert query_params dict to string for CSV + flat_entry['query_params'] = json.dumps(entry['query_params']) + + if first_entry: + # Initialize writer with fieldnames from first entry + fieldnames = flat_entry.keys() + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + first_entry = False + + writer.writerow(flat_entry) + + +def main(): + parser = argparse.ArgumentParser(description='Parse Fastly log files') + parser.add_argument( + '--input-dir', + type=str, + default='./logs/srv_quakesearch-fastly/raw', + help='Directory containing log files (default: ./logs/srv_quakesearch-fastly/raw)' + ) + parser.add_argument( + '--output', + type=str, + default='./logs/srv_quakesearch-fastly/parsed/parsed_logs.json', + help='Output file path (default: ./logs/srv_quakesearch-fastly/parsed/parsed_logs.json)' + ) + parser.add_argument( + '--format', + choices=['json', 'csv'], + default='json', + help='Output format (default: json)' + ) + parser.add_argument( + '--pattern', + type=str, + default='*.log*', + help='File pattern to match (default: *.log*)' + ) + + args = parser.parse_args() + + input_dir = Path(args.input_dir) + if not input_dir.exists(): + print(f"Error: Input directory does not exist: {input_dir}", file=sys.stderr) + sys.exit(1) + + # Find all log files + log_files = [] + for pattern in [args.pattern, '*.log.gz', '*.log']: + log_files.extend(input_dir.glob(pattern)) + + # Remove duplicates + log_files = list(set(log_files)) + + if not log_files: + print(f"No log files found in {input_dir}", file=sys.stderr) + sys.exit(1) + + print(f"Found {len(log_files)} log file(s)") + print("Parsing logs (lazy/streaming mode - memory efficient)...") + + # Generator function that yields all entries from all files + total_count = 0 + def all_entries_generator(): + nonlocal total_count + for log_file in sorted(log_files): + print(f" Processing: {log_file.name}") + file_count = 0 + for entry in process_log_file(log_file): + file_count += 1 + total_count += 1 + yield entry + print(f" Parsed {file_count} entries") + + # Determine output path + if args.output: + output_path = Path(args.output) + else: + output_path = Path(args.input_dir) / f"parsed_logs.{args.format}" + + # Check if output file exists and warn + if output_path.exists(): + print(f"Warning: Output file already exists: {output_path}") + print(f" It will be OVERWRITTEN with new data.") + print(f" (Previous data will be lost)") + + # Save output using streaming (overwrites existing file) + print(f"Saving to {output_path}...") + if args.format == 'json': + save_json_streaming(all_entries_generator(), output_path) + else: + save_csv_streaming(all_entries_generator(), output_path) + + print(f"\nTotal entries parsed: {total_count}") + + print("Done!") + + +if __name__ == '__main__': + main() + diff --git a/scripts/query_logs.py b/scripts/query_logs.py new file mode 100755 index 0000000..5d8a5a3 --- /dev/null +++ b/scripts/query_logs.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 +""" +Fastly Log Query Orchestration Script +Orchestrates the full pipeline: sync → parse → analyze +""" + +import argparse +import subprocess +import sys +from pathlib import Path +from datetime import datetime, timezone + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from src.utils.config_loader import load_config, get_enabled_sources +from src.utils.date_utils import parse_date_range + +# Colors for output +class Colors: + RED = '\033[0;31m' + GREEN = '\033[0;32m' + YELLOW = '\033[1;33m' + BLUE = '\033[0;34m' + NC = '\033[0m' # No Color + + +def check_python_packages(): + """Check if required Python packages are installed.""" + try: + import pandas + return True + except ImportError: + return False + + +def install_packages(): + """Install required packages.""" + project_root = Path(__file__).parent.parent + requirements_file = project_root / "requirements.txt" + + print(f"{Colors.YELLOW}Warning: Required Python packages not found. Installing...{Colors.NC}") + try: + subprocess.run( + [sys.executable, "-m", "pip", "install", "-r", str(requirements_file)], + check=True + ) + return True + except subprocess.CalledProcessError: + print(f"{Colors.RED}Error: Failed to install Python dependencies{Colors.NC}", file=sys.stderr) + return False + + +def run_sync(start_date: str, end_date: str, single_date: str = None): + """Run sync operation.""" + script_dir = Path(__file__).parent + sync_script = script_dir / "sync_logs.py" + + args = [sys.executable, str(sync_script)] + if single_date: + args.extend(["--date", single_date]) + else: + args.extend(["--start-date", start_date, "--end-date", end_date]) + + print(f"{Colors.GREEN}[1/3] Syncing logs from S3...{Colors.NC}") + try: + result = subprocess.run(args, check=True) + print() + return result.returncode == 0 + except subprocess.CalledProcessError: + print(f"{Colors.RED}Error: Sync failed{Colors.NC}", file=sys.stderr) + return False + + +def run_parse(): + """Run parse operation for all enabled sources.""" + script_dir = Path(__file__).parent + parse_script = script_dir / "parse_logs.py" + + print(f"{Colors.GREEN}[2/3] Parsing log files...{Colors.NC}") + + # Check Python packages + if not check_python_packages(): + if not install_packages(): + return None + + # Get enabled sources + try: + sources = load_config() + enabled = get_enabled_sources(sources) + except Exception as e: + print(f"{Colors.RED}Error: Failed to load configuration: {e}{Colors.NC}", file=sys.stderr) + return None + + if not enabled: + print(f"{Colors.RED}Error: No enabled sources found to parse{Colors.NC}", file=sys.stderr) + return None + + # Parse each enabled source + parsed_outputs = [] + for name, config in enabled.items(): + local_dir = config.get('local_dir', f"logs/{name}/raw") + parsed_dir = config.get('parsed_dir', f"logs/{name}/parsed") + output_file = f"{parsed_dir}/parsed_logs.json" + + print(f"{Colors.BLUE} Parsing source: {name}{Colors.NC}") + + args = [ + sys.executable, str(parse_script), + "--input-dir", local_dir, + "--output", output_file, + "--format", "json" + ] + + try: + subprocess.run(args, check=True) + parsed_outputs.append(output_file) + except subprocess.CalledProcessError: + print(f"{Colors.YELLOW}Warning: Parsing failed for {name}, continuing...{Colors.NC}", file=sys.stderr) + + print() + return parsed_outputs[0] if parsed_outputs else None + + +def run_analyze(parsed_output: str, analytics_output: str = None): + """Run analyze operation.""" + script_dir = Path(__file__).parent + analyze_script = script_dir / "analyze_logs.py" + + print(f"{Colors.GREEN}[3/3] Generating analytics...{Colors.NC}") + + # Check if parsed output exists + if not Path(parsed_output).exists(): + print(f"{Colors.RED}Error: Parsed log file not found: {parsed_output}{Colors.NC}", file=sys.stderr) + print("Run parse operation first or specify correct --parsed-output path", file=sys.stderr) + return False + + args = [ + sys.executable, str(analyze_script), + "--input", parsed_output, + "--format", "console" + ] + + if analytics_output: + args.extend(["--output", analytics_output]) + + try: + subprocess.run(args, check=True) + print() + return True + except subprocess.CalledProcessError: + print(f"{Colors.RED}Error: Analytics failed{Colors.NC}", file=sys.stderr) + return False + + +def main(): + parser = argparse.ArgumentParser( + description="Fastly Log Query Tool - Orchestrates sync, parse, and analyze operations", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s --date 2025-11-10 + %(prog)s --start-date 2025-11-10 --end-date 2025-11-12 --operation sync + %(prog)s --operation analyze --parsed-output ./logs/srv_quakesearch-fastly/parsed/parsed_logs.json + """ + ) + parser.add_argument("--start-date", type=str, help="Start date in YYYY-MM-DD format") + parser.add_argument("--end-date", type=str, help="End date in YYYY-MM-DD format") + parser.add_argument("--date", type=str, help="Start date in YYYY-MM-DD format (syncs from this date to today)") + parser.add_argument( + "--operation", + choices=["sync", "parse", "analyze", "all"], + default="all", + help="Operation to perform (default: all)" + ) + parser.add_argument( + "--parsed-output", + type=str, + default=None, + help="Output file for parsed logs (default: first enabled source's parsed output)" + ) + parser.add_argument( + "--analytics-output", + type=str, + help="Output file for analytics report (optional)" + ) + + args = parser.parse_args() + + # Parse date range + try: + start_date, end_date = parse_date_range( + args.start_date, args.end_date, args.date + ) + except ValueError as e: + print(f"{Colors.RED}Error: {e}{Colors.NC}", file=sys.stderr) + parser.print_help() + sys.exit(1) + + if args.date: + print(f"{Colors.YELLOW}Note: --date specified, syncing from {start_date} to {end_date} (today UTC){Colors.NC}") + + # Validate date parameters for sync operation + if args.operation in ["sync", "all"]: + if not start_date or not end_date: + print(f"{Colors.RED}Error: Start date and end date are required for sync operation{Colors.NC}", file=sys.stderr) + parser.print_help() + sys.exit(1) + + print(f"{Colors.BLUE}Fastly Log Query Tool{Colors.NC}") + print("========================") + print() + + # Step 1: Sync logs from S3 + if args.operation in ["sync", "all"]: + if not run_sync(start_date, end_date, args.date): + sys.exit(1) + + # Step 2: Parse logs + parsed_output = args.parsed_output + if args.operation in ["parse", "all"]: + result = run_parse() + if result is None: + sys.exit(1) + # Use parsed output from run_parse if not explicitly set + if not parsed_output: + parsed_output = result + + # Step 3: Analyze logs + if args.operation in ["analyze", "all"]: + if not parsed_output: + # Try to get first enabled source's output + try: + sources = load_config() + enabled = get_enabled_sources(sources) + if enabled: + first_source = next(iter(enabled)) + first_config = enabled[first_source] + parsed_dir = first_config.get('parsed_dir', f"logs/{first_source}/parsed") + parsed_output = f"{parsed_dir}/parsed_logs.json" + else: + print(f"{Colors.RED}Error: No enabled sources found{Colors.NC}", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"{Colors.RED}Error: Failed to determine parsed output: {e}{Colors.NC}", file=sys.stderr) + sys.exit(1) + + if not run_analyze(parsed_output, args.analytics_output): + sys.exit(1) + + print(f"{Colors.GREEN}All operations completed successfully!{Colors.NC}") + + +if __name__ == "__main__": + main() + diff --git a/scripts/sync_logs.py b/scripts/sync_logs.py new file mode 100755 index 0000000..dbcfa9f --- /dev/null +++ b/scripts/sync_logs.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 + +""" +Fastly Log Sync Script +CLI entry point for syncing logs from multiple sources +""" + +import argparse +import sys +from pathlib import Path + +# Add src to path for imports (go up one level from scripts/ to root) +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from src.utils.config_loader import load_config, get_enabled_sources +from src.utils.date_utils import parse_date_range +from src.sync.sync_manager import SyncManager + +# Colors for output +class Colors: + RED = '\033[0;31m' + GREEN = '\033[0;32m' + YELLOW = '\033[1;33m' + BLUE = '\033[0;34m' + NC = '\033[0m' # No Color + +def main(): + parser = argparse.ArgumentParser( + description="Sync logs from S3 (supports multiple sources via config)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Sync default (first enabled) source + %(prog)s --date 2025-11-10 + + # Sync specific source + %(prog)s --source srv_quakesearch-fastly --date 2025-11-10 + + # Sync all enabled sources + %(prog)s --all-sources --start-date 2025-11-10 --end-date 2025-11-12 + + # Sync with custom config file + %(prog)s --config custom_config.yaml --source srv_quakesearch-fastly --date 2025-11-10 + """ + ) + parser.add_argument("--start-date", type=str, help="Start date in YYYY-MM-DD format") + parser.add_argument("--end-date", type=str, help="End date in YYYY-MM-DD format") + parser.add_argument("--date", type=str, help="Start date in YYYY-MM-DD format (syncs from this date to today)") + parser.add_argument("--source", type=str, help="Specific log source to sync (from config)") + parser.add_argument("--all-sources", action="store_true", help="Sync all enabled sources") + parser.add_argument("--list-sources", action="store_true", help="List all available log sources") + parser.add_argument("--config", type=str, help="Path to configuration file (default: config/log_sources.yaml)") + parser.add_argument("--workers", type=int, default=10, help="Number of concurrent workers (default: 10)") + + args = parser.parse_args() + + # Load configuration + config_path = Path(args.config) if args.config else None + sources = load_config(config_path) + enabled_sources = get_enabled_sources(sources) + + # List sources if requested + if args.list_sources: + print(f"{Colors.BLUE}Available log sources:{Colors.NC}\n") + for name, config in sources.items(): + status = f"{Colors.GREEN}ENABLED{Colors.NC}" if config.get('enabled', False) else f"{Colors.YELLOW}DISABLED{Colors.NC}" + print(f" {name}: {status}") + print(f" Description: {config.get('description', 'N/A')}") + print(f" Type: {config.get('type', 'N/A')}") + if config.get('type') == 's3': + print(f" S3 Bucket: {config.get('s3_bucket', 'N/A')}") + print() + sys.exit(0) + + # Parse date range + try: + start_date, end_date = parse_date_range( + args.start_date, args.end_date, args.date + ) + except ValueError as e: + print(f"{Colors.RED}Error: {e}{Colors.NC}", file=sys.stderr) + parser.print_help() + sys.exit(1) + + if args.date: + print(f"{Colors.YELLOW}Note: --date specified, syncing from {start_date} to {end_date} (today UTC){Colors.NC}") + + # Determine which sources to sync + sources_to_sync = {} + + if args.all_sources: + sources_to_sync = enabled_sources + if not sources_to_sync: + print(f"{Colors.RED}Error: No enabled sources found in configuration{Colors.NC}", file=sys.stderr) + sys.exit(1) + elif args.source: + if args.source not in sources: + print(f"{Colors.RED}Error: Source '{args.source}' not found in configuration{Colors.NC}", file=sys.stderr) + print(f"Available sources: {', '.join(sources.keys())}", file=sys.stderr) + sys.exit(1) + if not sources[args.source].get('enabled', False): + print(f"{Colors.YELLOW}Warning: Source '{args.source}' is disabled{Colors.NC}", file=sys.stderr) + sources_to_sync = {args.source: sources[args.source]} + else: + # Default: sync all enabled sources automatically + if not enabled_sources: + print(f"{Colors.RED}Error: No enabled sources found in configuration{Colors.NC}", file=sys.stderr) + sys.exit(1) + sources_to_sync = enabled_sources + print(f"{Colors.YELLOW}Note: No source specified, syncing all enabled sources: {', '.join(sources_to_sync.keys())}{Colors.NC}") + + print(f"{Colors.GREEN}Starting log sync...{Colors.NC}") + print(f"Date Range: {start_date} to {end_date}") + print(f"Sources to sync: {', '.join(sources_to_sync.keys())}") + print(f"Concurrent Workers: {args.workers}") + print() + + # Create sync manager and sync sources + sync_manager = SyncManager(sources_to_sync) + + total_downloads = 0 + total_skips = 0 + total_errors = 0 + + for source_name in sources_to_sync: + downloads, skips, errors = sync_manager.sync_source( + source_name, start_date, end_date, args.workers + ) + total_downloads += downloads + total_skips += skips + total_errors += errors + + print() + print(f"{Colors.GREEN}{'='*60}{Colors.NC}") + print(f"{Colors.GREEN}All syncs completed!{Colors.NC}") + print(f" Total files downloaded: {total_downloads}") + print(f" Total files skipped: {total_skips}") + if total_errors > 0: + print(f" {Colors.RED}Total errors: {total_errors}{Colors.NC}") + sys.exit(1) + + sys.exit(0) + +if __name__ == "__main__": + main() diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..079da64 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,4 @@ +""" +Fastly Log Query - Source Package +""" + diff --git a/src/analyze/__init__.py b/src/analyze/__init__.py new file mode 100644 index 0000000..0a48d0b --- /dev/null +++ b/src/analyze/__init__.py @@ -0,0 +1,32 @@ +""" +Log analytics modules +""" + +from .analytics import ( + load_data, + analyze_traffic_patterns, + analyze_errors, + analyze_performance, + analyze_user_agents, + analyze_query_patterns, + analyze_slowness_patterns, + analyze_endpoint, + analyze_daily_summary, + create_query_signature, + generate_report +) + +__all__ = [ + 'load_data', + 'analyze_traffic_patterns', + 'analyze_errors', + 'analyze_performance', + 'analyze_user_agents', + 'analyze_query_patterns', + 'analyze_slowness_patterns', + 'analyze_endpoint', + 'analyze_daily_summary', + 'create_query_signature', + 'generate_report' +] + diff --git a/src/analyze/analytics.py b/src/analyze/analytics.py new file mode 100755 index 0000000..e19af59 --- /dev/null +++ b/src/analyze/analytics.py @@ -0,0 +1,961 @@ +#!/usr/bin/env python3 +""" +Fastly Log Analytics +Generates analytics reports from parsed Fastly log data. +""" + +import json +import csv +import argparse +import sys +from pathlib import Path +from collections import Counter, defaultdict +from datetime import datetime +from typing import Dict, List, Optional +import pandas as pd +import numpy as np + + +def load_data(input_path: Path) -> List[Dict]: + """Load parsed log data from JSON or CSV file.""" + if input_path.suffix == '.json': + with open(input_path, 'r') as f: + return json.load(f) + elif input_path.suffix == '.csv': + df = pd.read_csv(input_path) + # Convert query_params string back to dict if needed + if 'query_params' in df.columns: + df['query_params'] = df['query_params'].apply( + lambda x: json.loads(x) if isinstance(x, str) else {} + ) + return df.to_dict('records') + else: + raise ValueError(f"Unsupported file format: {input_path.suffix}") + + +def analyze_traffic_patterns(entries: List[Dict]) -> Dict: + """Analyze traffic patterns.""" + if not entries: + return {} + + # Convert to DataFrame for easier analysis + df = pd.DataFrame(entries) + + # Handle missing timestamps + if 'timestamp' in df.columns: + df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') + df_with_timestamp = df[df['timestamp'].notna()] + + if len(df_with_timestamp) > 0: + # Requests per hour + df_with_timestamp['hour'] = df_with_timestamp['timestamp'].dt.floor('h') + requests_per_hour = df_with_timestamp.groupby('hour').size().to_dict() + + # Requests per day + df_with_timestamp['day'] = df_with_timestamp['timestamp'].dt.date + requests_per_day = {str(k): int(v) for k, v in df_with_timestamp.groupby('day').size().to_dict().items()} + else: + requests_per_hour = {} + requests_per_day = {} + else: + requests_per_hour = {} + requests_per_day = {} + + # Popular endpoints (handle None values) + if 'path' in df.columns: + endpoint_counts = df['path'].dropna().value_counts().head(20).to_dict() + else: + endpoint_counts = {} + + # Popular HTTP methods (handle None values) + if 'http_method' in df.columns: + method_counts = df['http_method'].dropna().value_counts().to_dict() + else: + method_counts = {} + + return { + 'total_requests': len(entries), + 'requests_per_hour': {str(k): int(v) for k, v in requests_per_hour.items()}, + 'requests_per_day': requests_per_day, + 'popular_endpoints': {k: int(v) for k, v in endpoint_counts.items()}, + 'http_methods': {k: int(v) for k, v in method_counts.items()} + } + + +def analyze_errors(entries: List[Dict]) -> Dict: + """Analyze error patterns.""" + if not entries: + return {} + + df = pd.DataFrame(entries) + + # Status code distribution (handle None values) + if 'status_code' in df.columns: + status_counts = df['status_code'].dropna().value_counts().to_dict() + df_with_status = df[df['status_code'].notna()] + + # Error rates + total = len(df_with_status) + error_4xx = len(df_with_status[df_with_status['status_code'].between(400, 499)]) + error_5xx = len(df_with_status[df_with_status['status_code'].between(500, 599)]) + + # Most common errors + error_entries = df_with_status[df_with_status['status_code'] >= 400] + if 'path' in error_entries.columns: + error_endpoints = error_entries['path'].dropna().value_counts().head(10).to_dict() + else: + error_endpoints = {} + else: + status_counts = {} + total = 0 + error_4xx = 0 + error_5xx = 0 + error_endpoints = {} + + return { + 'status_code_distribution': {str(k): int(v) for k, v in status_counts.items()}, + 'total_requests': int(total), + 'error_4xx_count': int(error_4xx), + 'error_4xx_rate': float(error_4xx / total * 100) if total > 0 else 0, + 'error_5xx_count': int(error_5xx), + 'error_5xx_rate': float(error_5xx / total * 100) if total > 0 else 0, + 'total_error_rate': float((error_4xx + error_5xx) / total * 100) if total > 0 else 0, + 'error_endpoints': {k: int(v) for k, v in error_endpoints.items()} + } + + +def analyze_performance(entries: List[Dict]) -> Dict: + """Analyze performance metrics.""" + if not entries: + return {} + + df = pd.DataFrame(entries) + + # Cache hit/miss rates (handle None values) + if 'cache_status' in df.columns: + cache_counts = df['cache_status'].dropna().value_counts().to_dict() + total = len(df[df['cache_status'].notna()]) + hit_count = cache_counts.get('hit', 0) + miss_count = cache_counts.get('miss', 0) + else: + cache_counts = {} + total = 0 + hit_count = 0 + miss_count = 0 + + # Response size statistics (handle None values) + if 'response_size' in df.columns: + df_with_size = df[df['response_size'].notna()] + if len(df_with_size) > 0: + response_size_stats = { + 'mean': float(df_with_size['response_size'].mean()), + 'median': float(df_with_size['response_size'].median()), + 'min': int(df_with_size['response_size'].min()), + 'max': int(df_with_size['response_size'].max()), + 'p95': float(df_with_size['response_size'].quantile(0.95)), + 'p99': float(df_with_size['response_size'].quantile(0.99)) + } + + # Response size by endpoint + if 'path' in df_with_size.columns: + endpoint_sizes = df_with_size.groupby('path')['response_size'].agg(['mean', 'count']).to_dict('index') + top_endpoints_by_size = dict(sorted( + endpoint_sizes.items(), + key=lambda x: x[1]['mean'], + reverse=True + )[:10]) + else: + top_endpoints_by_size = {} + else: + response_size_stats = {} + top_endpoints_by_size = {} + else: + response_size_stats = {} + top_endpoints_by_size = {} + + return { + 'cache_statistics': {k: int(v) for k, v in cache_counts.items()}, + 'cache_hit_rate': float(hit_count / total * 100) if total > 0 else 0, + 'cache_miss_rate': float(miss_count / total * 100) if total > 0 else 0, + 'response_size_statistics': response_size_stats, + 'top_endpoints_by_size': { + k: {'mean_size': float(v['mean']), 'request_count': int(v['count'])} + for k, v in top_endpoints_by_size.items() + } + } + + +def analyze_user_agents(entries: List[Dict]) -> Dict: + """Analyze user agent patterns.""" + if not entries: + return {} + + df = pd.DataFrame(entries) + + # Top user agents (handle None values) + if 'user_agent' in df.columns: + top_user_agents = df['user_agent'].dropna().value_counts().head(20).to_dict() + + # Extract browser/agent type + def extract_agent_type(ua): + # Handle None, NaN, and empty values + if pd.isna(ua) or ua is None or (isinstance(ua, str) and not ua): + return 'Unknown' + # Convert to string in case it's not already + ua_str = str(ua) if not isinstance(ua, str) else ua + ua_lower = ua_str.lower() + if 'mozilla' in ua_lower and 'firefox' in ua_lower: + return 'Firefox' + elif 'chrome' in ua_lower and 'safari' in ua_lower: + return 'Chrome' + elif 'safari' in ua_lower and 'chrome' not in ua_lower: + return 'Safari' + elif 'python-requests' in ua_lower: + return 'Python/requests' + elif 'curl' in ua_lower: + return 'curl' + elif 'datadog' in ua_lower: + return 'Datadog' + else: + return 'Other' + + df['agent_type'] = df['user_agent'].apply(extract_agent_type) + agent_type_counts = df['agent_type'].value_counts().to_dict() + else: + top_user_agents = {} + agent_type_counts = {} + + return { + 'top_user_agents': {k: int(v) for k, v in top_user_agents.items()}, + 'agent_type_distribution': {k: int(v) for k, v in agent_type_counts.items()} + } + + +def create_query_signature(entry: Dict) -> str: + """ + Create a unique signature for a query based on path and query parameters. + + Args: + entry: Log entry dictionary + + Returns: + Query signature string (path + sorted params) + """ + path = entry.get('path', '') + query_params = entry.get('query_params', {}) + + if isinstance(query_params, str): + try: + query_params = json.loads(query_params) + except: + query_params = {} + + if query_params and isinstance(query_params, dict): + # Sort parameters for consistent signatures + sorted_params = sorted(query_params.items()) + param_str = '&'.join([f"{k}={v}" if v else k for k, v in sorted_params]) + return f"{path}?{param_str}" if param_str else path + else: + return path + + +def analyze_query_patterns(entries: List[Dict]) -> Dict: + """Analyze query parameter patterns.""" + if not entries: + return {} + + # Collect all query parameters + param_counts = Counter() + param_value_counts = defaultdict(Counter) + query_signatures = Counter() + + for entry in entries: + query_params = entry.get('query_params', {}) + if isinstance(query_params, str): + try: + query_params = json.loads(query_params) + except: + query_params = {} + + for param, value in query_params.items(): + param_counts[param] += 1 + param_value_counts[param][value] += 1 + + # Track query signatures (path + params) + query_sig = create_query_signature(entry) + query_signatures[query_sig] += 1 + + # Most common parameters + top_params = dict(param_counts.most_common(20)) + + # Most common values for top parameters + top_param_values = {} + for param in list(top_params.keys())[:10]: + top_param_values[param] = dict(param_value_counts[param].most_common(10)) + + # Top query signatures (unique queries) + top_queries = dict(query_signatures.most_common(20)) + + return { + 'most_common_parameters': {k: int(v) for k, v in top_params.items()}, + 'parameter_value_distributions': { + k: {vk: int(vv) for vk, vv in v.items()} + for k, v in top_param_values.items() + }, + 'top_query_signatures': {k: int(v) for k, v in top_queries.items()} + } + + +def analyze_slowness_patterns(entries: List[Dict]) -> Dict: + """Analyze patterns that might indicate slowness issues.""" + if not entries: + return {} + + df = pd.DataFrame(entries) + + results = {} + + # 1. Time-based patterns (when does slowness occur?) + if 'timestamp' in df.columns: + df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') + df_with_time = df[df['timestamp'].notna()].copy() + + if len(df_with_time) > 0: + df_with_time['hour'] = df_with_time['timestamp'].dt.hour + df_with_time['day_of_week'] = df_with_time['timestamp'].dt.day_name() + df_with_time['minute'] = df_with_time['timestamp'].dt.floor('min') + df_with_time['five_min'] = df_with_time['timestamp'].dt.floor('5min') + + # Requests per hour (identify peak times) + requests_by_hour = df_with_time.groupby('hour').size().to_dict() + results['requests_by_hour'] = {int(k): int(v) for k, v in requests_by_hour.items()} + + # Peak hour detection + if requests_by_hour: + max_requests = max(requests_by_hour.values()) + peak_hours = [k for k, v in requests_by_hour.items() if v == max_requests] + results['peak_hour'] = { + 'hour': int(peak_hours[0]) if peak_hours else None, + 'requests': int(max_requests) + } + + # Requests per minute (to see spikes) + requests_per_minute = df_with_time.groupby('minute').size().to_dict() + results['requests_per_minute'] = {str(k): int(v) for k, v in requests_per_minute.items()} + + # Peak minute detection + if requests_per_minute: + max_requests = max(requests_per_minute.values()) + peak_minutes = [k for k, v in requests_per_minute.items() if v == max_requests] + results['peak_minute'] = { + 'time': str(peak_minutes[0]) if peak_minutes else None, + 'requests': int(max_requests) + } + + # Requests per 5-minute window (for rate of change analysis) + requests_per_5min = df_with_time.groupby('five_min').size().to_dict() + results['requests_per_5min'] = {str(k): int(v) for k, v in requests_per_5min.items()} + + # Calculate rate of change (spikes) + if len(requests_per_5min) > 1: + sorted_times = sorted(requests_per_5min.keys()) + rate_changes = [] + for i in range(1, len(sorted_times)): + prev_count = requests_per_5min[sorted_times[i-1]] + curr_count = requests_per_5min[sorted_times[i]] + if prev_count > 0: + rate_change = ((curr_count - prev_count) / prev_count) * 100 + rate_changes.append({ + 'time': str(sorted_times[i]), + 'rate_change_pct': float(rate_change), + 'requests': int(curr_count) + }) + # Find largest spikes + if rate_changes: + largest_spikes = sorted(rate_changes, key=lambda x: abs(x['rate_change_pct']), reverse=True)[:5] + results['largest_traffic_spikes'] = largest_spikes + + # Requests by day of week + requests_by_dow = df_with_time.groupby('day_of_week').size().to_dict() + results['requests_by_day_of_week'] = {k: int(v) for k, v in requests_by_dow.items()} + + # 2. Cache miss patterns (cache misses are slower) + if 'cache_status' in df.columns: + df_with_cache = df[df['cache_status'].notna()].copy() + if len(df_with_cache) > 0: + # Cache miss rate by endpoint + if 'path' in df_with_cache.columns: + cache_by_endpoint = df_with_cache.groupby('path')['cache_status'].apply( + lambda x: (x == 'miss').sum() / len(x) * 100 + ).sort_values(ascending=False).head(20).to_dict() + results['high_cache_miss_endpoints'] = {k: float(v) for k, v in cache_by_endpoint.items()} + + # Cache miss rate by hour (when are cache misses most common?) + if 'timestamp' in df_with_cache.columns: + df_with_cache['timestamp'] = pd.to_datetime(df_with_cache['timestamp'], errors='coerce') + df_with_cache = df_with_cache[df_with_cache['timestamp'].notna()] + if len(df_with_cache) > 0: + df_with_cache['hour'] = df_with_cache['timestamp'].dt.hour + cache_miss_by_hour = df_with_cache.groupby('hour').apply( + lambda x: (x['cache_status'] == 'miss').sum() / len(x) * 100, + include_groups=False + ).to_dict() + results['cache_miss_rate_by_hour'] = {int(k): float(v) for k, v in cache_miss_by_hour.items()} + + # Hourly cache performance breakdown + hourly_cache = {} + for hour in df_with_cache['hour'].unique(): + hour_df = df_with_cache[df_with_cache['hour'] == hour] + hour_cache_counts = hour_df['cache_status'].value_counts().to_dict() + hour_total = len(hour_df) + hourly_cache[int(hour)] = { + 'hit_count': int(hour_cache_counts.get('hit', 0)), + 'miss_count': int(hour_cache_counts.get('miss', 0)), + 'hit_rate': float((hour_cache_counts.get('hit', 0) / hour_total) * 100) if hour_total > 0 else 0.0, + 'miss_rate': float((hour_cache_counts.get('miss', 0) / hour_total) * 100) if hour_total > 0 else 0.0, + } + results['hourly_cache_performance'] = hourly_cache + + # 3. Large response sizes (could indicate slow endpoints) + if 'response_size' in df.columns and 'path' in df.columns: + df_with_size = df[df['response_size'].notna() & df['path'].notna()].copy() + if len(df_with_size) > 0: + # Endpoints with largest average response sizes + large_responses = df_with_size.groupby('path')['response_size'].agg(['mean', 'count', 'max']).sort_values('mean', ascending=False).head(20) + results['large_response_endpoints'] = { + k: { + 'mean_size': float(v['mean']), + 'max_size': int(v['max']), + 'request_count': int(v['count']) + } + for k, v in large_responses.to_dict('index').items() + } + + # Very large responses (outliers) + p99_size = df_with_size['response_size'].quantile(0.99) + very_large = df_with_size[df_with_size['response_size'] > p99_size] + if len(very_large) > 0: + results['outlier_large_responses'] = { + 'p99_threshold': float(p99_size), + 'count': int(len(very_large)), + 'percentage': float(len(very_large) / len(df_with_size) * 100), + 'top_endpoints': very_large['path'].value_counts().head(10).to_dict() + } + + # Hourly response size breakdown + if 'timestamp' in df_with_size.columns: + df_with_size['timestamp'] = pd.to_datetime(df_with_size['timestamp'], errors='coerce') + df_with_size = df_with_size[df_with_size['timestamp'].notna()] + if len(df_with_size) > 0: + df_with_size['hour'] = df_with_size['timestamp'].dt.hour + hourly_sizes = {} + for hour in df_with_size['hour'].unique(): + hour_df = df_with_size[df_with_size['hour'] == hour] + hour_sizes = hour_df['response_size'].dropna() + if len(hour_sizes) > 0: + hourly_sizes[int(hour)] = { + 'mean_mb': float(hour_sizes.mean() / (1024 * 1024)), + 'median_mb': float(hour_sizes.median() / (1024 * 1024)), + 'p95_mb': float(hour_sizes.quantile(0.95) / (1024 * 1024)), + } + results['hourly_response_sizes'] = hourly_sizes + + # 4. Error correlation with slowness (errors might indicate slowness) + if 'status_code' in df.columns and 'path' in df.columns: + df_with_status = df[df['status_code'].notna() & df['path'].notna()].copy() + if len(df_with_status) > 0: + # Endpoints with high error rates (might be slow/failing) + error_rates = df_with_status.groupby('path').apply( + lambda x: (x['status_code'] >= 400).sum() / len(x) * 100, + include_groups=False + ).sort_values(ascending=False).head(20).to_dict() + results['high_error_rate_endpoints'] = {k: float(v) for k, v in error_rates.items()} + + # 5xx errors by hour (server issues might cause slowness) + if 'timestamp' in df_with_status.columns: + df_with_status['timestamp'] = pd.to_datetime(df_with_status['timestamp'], errors='coerce') + df_with_status = df_with_status[df_with_status['timestamp'].notna()] + if len(df_with_status) > 0: + df_with_status['hour'] = df_with_status['timestamp'].dt.hour + server_errors_by_hour = df_with_status[df_with_status['status_code'] >= 500].groupby('hour').size().to_dict() + results['server_errors_by_hour'] = {int(k): int(v) for k, v in server_errors_by_hour.items()} + + # Hourly error breakdown + hourly_errors = {} + for hour in df_with_status['hour'].unique(): + hour_df = df_with_status[df_with_status['hour'] == hour] + hour_total = len(hour_df) + hour_4xx = len(hour_df[(hour_df['status_code'] >= 400) & (hour_df['status_code'] < 500)]) + hour_5xx = len(hour_df[(hour_df['status_code'] >= 500) & (hour_df['status_code'] < 600)]) + hourly_errors[int(hour)] = { + 'total': int(hour_total), + '4xx_count': int(hour_4xx), + '4xx_percentage': float((hour_4xx / hour_total) * 100) if hour_total > 0 else 0.0, + '5xx_count': int(hour_5xx), + '5xx_percentage': float((hour_5xx / hour_total) * 100) if hour_total > 0 else 0.0, + } + results['hourly_error_rates'] = hourly_errors + + # 5. Query parameter patterns that might cause slowness + if 'query_params' in df.columns and 'path' in df.columns: + # Find endpoints with complex queries (many parameters) + complex_queries = [] + for entry in entries: + query_params = entry.get('query_params', {}) + if isinstance(query_params, str): + try: + query_params = json.loads(query_params) + except: + query_params = {} + + param_count = len(query_params) if query_params else 0 + if param_count > 5: # More than 5 parameters might indicate complex queries + complex_queries.append({ + 'path': entry.get('path'), + 'param_count': param_count, + 'params': list(query_params.keys()) if query_params else [] + }) + + if complex_queries: + complex_df = pd.DataFrame(complex_queries) + if len(complex_df) > 0: + complex_by_endpoint = complex_df.groupby('path')['param_count'].agg(['mean', 'max', 'count']).sort_values('mean', ascending=False).head(20) + results['complex_query_endpoints'] = { + k: { + 'avg_params': float(v['mean']), + 'max_params': int(v['max']), + 'request_count': int(v['count']) + } + for k, v in complex_by_endpoint.to_dict('index').items() + } + + # 6. IP address patterns (maybe certain IPs are causing slowness) + if 'ip_address' in df.columns: + df_with_ip = df[df['ip_address'].notna()].copy() + if len(df_with_ip) > 0: + # Top IPs by request volume (might indicate bots/crawlers causing load) + top_ips = df_with_ip['ip_address'].value_counts().head(20).to_dict() + results['top_request_ips'] = {k: int(v) for k, v in top_ips.items()} + + # 7. User agent patterns (certain clients might be slower) + if 'user_agent' in df.columns and 'response_size' in df.columns: + df_with_ua = df[df['user_agent'].notna() & df['response_size'].notna()].copy() + if len(df_with_ua) > 0: + # Average response size by user agent (some clients might get different responses) + ua_response_sizes = df_with_ua.groupby('user_agent')['response_size'].agg(['mean', 'count']).sort_values('mean', ascending=False).head(10).to_dict('index') + results['user_agent_response_sizes'] = { + k: {'mean_size': float(v['mean']), 'request_count': int(v['count'])} + for k, v in ua_response_sizes.items() + } + + return results + + +def analyze_endpoint(entries: List[Dict], endpoint: str) -> Dict: + """ + Perform detailed analysis on a specific endpoint. + + Args: + entries: List of log entries + endpoint: Endpoint path to analyze + + Returns: + Dictionary with detailed endpoint analysis + """ + # Filter entries for this endpoint + endpoint_entries = [ + e for e in entries + if e.get('path') == endpoint + ] + + if not endpoint_entries: + return {'error': f'No entries found for endpoint: {endpoint}'} + + df = pd.DataFrame(endpoint_entries) + + results = { + 'endpoint': endpoint, + 'total_requests': len(endpoint_entries), + } + + # Time-based analysis + if 'timestamp' in df.columns: + df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') + df_with_time = df[df['timestamp'].notna()].copy() + + if len(df_with_time) > 0: + df_with_time['hour'] = df_with_time['timestamp'].dt.hour + df_with_time['day'] = df_with_time['timestamp'].dt.date + + # Requests by hour + requests_by_hour = df_with_time.groupby('hour').size().to_dict() + results['requests_by_hour'] = {int(k): int(v) for k, v in requests_by_hour.items()} + + # Peak hours + peak_hours = sorted(requests_by_hour.items(), key=lambda x: x[1], reverse=True)[:5] + results['peak_hours'] = [{'hour': int(h), 'count': int(c)} for h, c in peak_hours] + + # Status codes and errors + if 'status_code' in df.columns: + df_with_status = df[df['status_code'].notna()].copy() + status_counts = df_with_status['status_code'].value_counts().to_dict() + results['status_codes'] = {str(k): int(v) for k, v in status_counts.items()} + + total = len(df_with_status) + error_4xx = len(df_with_status[df_with_status['status_code'].between(400, 499)]) + error_5xx = len(df_with_status[df_with_status['status_code'].between(500, 599)]) + + results['error_analysis'] = { + 'total': int(total), + 'error_4xx': int(error_4xx), + 'error_4xx_rate': float(error_4xx / total * 100) if total > 0 else 0, + 'error_5xx': int(error_5xx), + 'error_5xx_rate': float(error_5xx / total * 100) if total > 0 else 0, + } + + # Response sizes + if 'response_size' in df.columns: + df_with_size = df[df['response_size'].notna()].copy() + if len(df_with_size) > 0: + results['response_size_stats'] = { + 'mean': float(df_with_size['response_size'].mean()), + 'median': float(df_with_size['response_size'].median()), + 'min': int(df_with_size['response_size'].min()), + 'max': int(df_with_size['response_size'].max()), + 'p95': float(df_with_size['response_size'].quantile(0.95)), + 'p99': float(df_with_size['response_size'].quantile(0.99)), + } + + # Cache performance + if 'cache_status' in df.columns: + df_with_cache = df[df['cache_status'].notna()].copy() + if len(df_with_cache) > 0: + cache_counts = df_with_cache['cache_status'].value_counts().to_dict() + total = len(df_with_cache) + hit_count = cache_counts.get('hit', 0) + miss_count = cache_counts.get('miss', 0) + + results['cache_analysis'] = { + 'total': int(total), + 'hit': int(hit_count), + 'miss': int(miss_count), + 'hit_rate': float(hit_count / total * 100) if total > 0 else 0, + 'miss_rate': float(miss_count / total * 100) if total > 0 else 0, + } + + # Query parameters + param_counts = Counter() + param_value_counts = defaultdict(Counter) + + for entry in endpoint_entries: + query_params = entry.get('query_params', {}) + if isinstance(query_params, str): + try: + query_params = json.loads(query_params) + except: + query_params = {} + + if query_params: + for param, value in query_params.items(): + param_counts[param] += 1 + param_value_counts[param][value] += 1 + + results['query_parameters'] = { + 'most_common': {k: int(v) for k, v in param_counts.most_common(10)}, + 'parameter_values': { + k: {vk: int(vv) for vk, vv in v.most_common(5)} + for k, v in list(param_value_counts.items())[:5] + } + } + + return results + + +def analyze_daily_summary(entries: List[Dict]) -> Dict: + """ + Analyze daily request totals with HTTP status code breakdown. + + Args: + entries: List of log entries + + Returns: + Dictionary with daily summary including status code breakdown + """ + if not entries: + return {'error': 'No entries found'} + + df = pd.DataFrame(entries) + + # Convert timestamp + if 'timestamp' in df.columns: + df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') + df = df[df['timestamp'].notna()] + else: + return {'error': 'No timestamp field found in logs'} + + # Extract date (day) + df['date'] = df['timestamp'].dt.date + + results = { + 'total_requests': len(df), + 'date_range': { + 'start': str(df['date'].min()), + 'end': str(df['date'].max()), + }, + 'daily_summary': {} + } + + # Group by date + for date in sorted(df['date'].unique()): + day_df = df[df['date'] == date] + date_str = str(date) + + # Total requests for the day + total_requests = len(day_df) + + # Status code breakdown + status_breakdown = {} + if 'status_code' in day_df.columns: + status_counts = day_df['status_code'].dropna().value_counts().to_dict() + status_breakdown = {int(k): int(v) for k, v in status_counts.items()} + + # Calculate percentages + status_percentages = {} + for code, count in status_breakdown.items(): + status_percentages[code] = float((count / total_requests) * 100) if total_requests > 0 else 0.0 + + # Group by status code ranges + status_ranges = { + '1xx': sum(v for k, v in status_breakdown.items() if 100 <= k < 200), + '2xx': sum(v for k, v in status_breakdown.items() if 200 <= k < 300), + '3xx': sum(v for k, v in status_breakdown.items() if 300 <= k < 400), + '4xx': sum(v for k, v in status_breakdown.items() if 400 <= k < 500), + '5xx': sum(v for k, v in status_breakdown.items() if 500 <= k < 600), + } + + status_range_percentages = { + range_name: float((count / total_requests) * 100) if total_requests > 0 else 0.0 + for range_name, count in status_ranges.items() + } + else: + status_breakdown = {} + status_percentages = {} + status_ranges = {} + status_range_percentages = {} + + results['daily_summary'][date_str] = { + 'total_requests': int(total_requests), + 'status_codes': status_breakdown, + 'status_percentages': status_percentages, + 'status_ranges': status_ranges, + 'status_range_percentages': status_range_percentages, + } + + return results + + +def generate_report(analytics: Dict, output_format: str, output_path: Optional[Path] = None): + """Generate and output analytics report.""" + if output_format == 'json': + output = json.dumps(analytics, indent=2) + if output_path: + with open(output_path, 'w') as f: + f.write(output) + else: + print(output) + elif output_format == 'console': + print("\n" + "="*80) + print("FASTLY LOG ANALYTICS REPORT") + print("="*80) + + # Traffic Patterns + if 'traffic' in analytics: + tp = analytics['traffic'] + print("\n## Traffic Patterns") + print(f"Total Requests: {tp.get('total_requests', 0):,}") + print(f"\nHTTP Methods:") + for method, count in tp.get('http_methods', {}).items(): + print(f" {method}: {count:,}") + print(f"\nTop 10 Endpoints:") + for endpoint, count in list(tp.get('popular_endpoints', {}).items())[:10]: + print(f" {endpoint}: {count:,}") + + # Error Analysis + if 'errors' in analytics: + err = analytics['errors'] + print("\n## Error Analysis") + print(f"Total Requests: {err.get('total_requests', 0):,}") + print(f"4xx Errors: {err.get('error_4xx_count', 0):,} ({err.get('error_4xx_rate', 0):.2f}%)") + print(f"5xx Errors: {err.get('error_5xx_count', 0):,} ({err.get('error_5xx_rate', 0):.2f}%)") + print(f"Total Error Rate: {err.get('total_error_rate', 0):.2f}%") + print(f"\nStatus Code Distribution:") + for code, count in sorted(err.get('status_code_distribution', {}).items()): + print(f" {code}: {count:,}") + + # Performance + if 'performance' in analytics: + perf = analytics['performance'] + print("\n## Performance Metrics") + print(f"Cache Hit Rate: {perf.get('cache_hit_rate', 0):.2f}%") + print(f"Cache Miss Rate: {perf.get('cache_miss_rate', 0):.2f}%") + rs = perf.get('response_size_statistics', {}) + print(f"\nResponse Size Statistics:") + print(f" Mean: {rs.get('mean', 0):.2f} bytes") + print(f" Median: {rs.get('median', 0):.2f} bytes") + print(f" P95: {rs.get('p95', 0):.2f} bytes") + print(f" P99: {rs.get('p99', 0):.2f} bytes") + + # User Agents + if 'user_agents' in analytics: + ua = analytics['user_agents'] + print("\n## User Agent Analysis") + print(f"Agent Type Distribution:") + for agent_type, count in ua.get('agent_type_distribution', {}).items(): + print(f" {agent_type}: {count:,}") + + # Query Patterns + if 'query_patterns' in analytics: + qp = analytics['query_patterns'] + print("\n## Query Parameter Analysis") + print(f"Most Common Parameters:") + for param, count in list(qp.get('most_common_parameters', {}).items())[:10]: + print(f" {param}: {count:,}") + + # Slowness Investigation + if 'slowness_investigation' in analytics: + slow = analytics['slowness_investigation'] + print("\n## Slowness Investigation") + + # Time-based patterns + if 'requests_by_hour' in slow: + print("\n### Traffic by Hour (identify peak times)") + peak_hours = sorted(slow['requests_by_hour'].items(), key=lambda x: x[1], reverse=True)[:5] + for hour, count in peak_hours: + print(f" Hour {hour:02d}:00 - {count:,} requests") + + if 'cache_miss_rate_by_hour' in slow: + print("\n### Cache Miss Rate by Hour (cache misses are slower)") + for hour in sorted(slow['cache_miss_rate_by_hour'].keys()): + rate = slow['cache_miss_rate_by_hour'][hour] + print(f" Hour {hour:02d}:00 - {rate:.1f}% cache miss rate") + + # High cache miss endpoints + if 'high_cache_miss_endpoints' in slow: + print("\n### Endpoints with High Cache Miss Rates (>50%)") + high_miss = {k: v for k, v in slow['high_cache_miss_endpoints'].items() if v > 50} + if high_miss: + for endpoint, rate in sorted(high_miss.items(), key=lambda x: x[1], reverse=True)[:10]: + print(f" {endpoint}: {rate:.1f}% miss rate") + else: + print(" (No endpoints with >50% cache miss rate)") + + # Large response sizes + if 'large_response_endpoints' in slow: + print("\n### Endpoints with Largest Average Response Sizes") + for endpoint, data in list(slow['large_response_endpoints'].items())[:10]: + size_mb = data['mean_size'] / (1024 * 1024) + print(f" {endpoint}: {size_mb:.2f} MB avg ({data['request_count']:,} requests)") + + # Outlier large responses + if 'outlier_large_responses' in slow: + outlier = slow['outlier_large_responses'] + print(f"\n### Very Large Responses (Outliers)") + print(f" P99 threshold: {outlier['p99_threshold'] / (1024*1024):.2f} MB") + print(f" Outlier count: {outlier['count']:,} ({outlier['percentage']:.2f}% of requests)") + if 'top_endpoints' in outlier and outlier['top_endpoints']: + print(f" Top endpoints with outliers:") + for endpoint, count in list(outlier['top_endpoints'].items())[:5]: + print(f" {endpoint}: {count:,}") + + # High error rate endpoints + if 'high_error_rate_endpoints' in slow: + print("\n### Endpoints with High Error Rates (might indicate slowness)") + high_errors = {k: v for k, v in slow['high_error_rate_endpoints'].items() if v > 5} + if high_errors: + for endpoint, rate in sorted(high_errors.items(), key=lambda x: x[1], reverse=True)[:10]: + print(f" {endpoint}: {rate:.1f}% error rate") + else: + print(" (No endpoints with >5% error rate)") + + # Server errors by hour + if 'server_errors_by_hour' in slow and slow['server_errors_by_hour']: + print("\n### Server Errors (5xx) by Hour") + for hour in sorted(slow['server_errors_by_hour'].keys()): + count = slow['server_errors_by_hour'][hour] + print(f" Hour {hour:02d}:00 - {count:,} server errors") + + # Complex queries + if 'complex_query_endpoints' in slow: + print("\n### Endpoints with Complex Queries (>5 parameters avg)") + for endpoint, data in list(slow['complex_query_endpoints'].items())[:10]: + print(f" {endpoint}: {data['avg_params']:.1f} avg params ({data['request_count']:,} requests)") + + # Top request IPs + if 'top_request_ips' in slow: + print("\n### Top Request IPs (might indicate bots/crawlers)") + for ip, count in list(slow['top_request_ips'].items())[:10]: + print(f" {ip}: {count:,} requests") + + print("\n" + "="*80) + + if output_path: + # Also save JSON to file + json_output = json.dumps(analytics, indent=2) + with open(output_path, 'w') as f: + f.write(json_output) + else: + raise ValueError(f"Unsupported output format: {output_format}") + + +def main(): + parser = argparse.ArgumentParser(description='Analyze parsed Fastly log data') + parser.add_argument( + '--input', + type=str, + required=True, + help='Input file (parsed JSON or CSV)' + ) + parser.add_argument( + '--output', + type=str, + help='Output file path (optional)' + ) + parser.add_argument( + '--format', + choices=['json', 'console'], + default='console', + help='Output format (default: console)' + ) + + args = parser.parse_args() + + input_path = Path(args.input) + if not input_path.exists(): + print(f"Error: Input file does not exist: {input_path}", file=sys.stderr) + sys.exit(1) + + print(f"Loading data from {input_path}...") + entries = load_data(input_path) + print(f"Loaded {len(entries):,} log entries") + + print("Generating analytics...") + analytics = { + 'traffic': analyze_traffic_patterns(entries), + 'errors': analyze_errors(entries), + 'performance': analyze_performance(entries), + 'user_agents': analyze_user_agents(entries), + 'query_patterns': analyze_query_patterns(entries), + 'slowness_investigation': analyze_slowness_patterns(entries) + } + + output_path = Path(args.output) if args.output else None + generate_report(analytics, args.format, output_path) + + if output_path: + print(f"\nReport saved to {output_path}") + print("Done!") + + +if __name__ == '__main__': + main() + diff --git a/src/parse/__init__.py b/src/parse/__init__.py new file mode 100644 index 0000000..4cb274a --- /dev/null +++ b/src/parse/__init__.py @@ -0,0 +1,8 @@ +""" +Log parsing modules +""" + +from .log_parser import parse_log_line, process_log_file, save_json_streaming, save_csv_streaming + +__all__ = ['parse_log_line', 'process_log_file', 'save_json_streaming', 'save_csv_streaming'] + diff --git a/src/parse/log_parser.py b/src/parse/log_parser.py new file mode 100755 index 0000000..89b4d6e --- /dev/null +++ b/src/parse/log_parser.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python3 +""" +Fastly Log Parser +Parses syslog-style Fastly log entries and converts them to structured data. +""" + +import re +import gzip +import json +import csv +import argparse +from pathlib import Path +from typing import Dict, List, Optional +from datetime import datetime +import sys + + +# Fastly log format regex pattern +# Format: timestamp cache-server process[pid]: IP "-" "-" date "METHOD path" status size "-" "user-agent" cache-status +LOG_PATTERN = re.compile( + r'<(\d+)>' # Priority code + r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)' # Timestamp + r'\s+(\S+)' # Cache server + r'\s+(\S+)\[(\d+)\]:' # Process and PID + r'\s+(\S+)' # IP address + r'\s+"([^"]*)"' # First "-" field (usually referrer) + r'\s+"([^"]*)"' # Second "-" field + r'\s+([^"]+?)(?=\s+")' # Date string (non-greedy until next quote) + r'\s+"([A-Z]+)\s+([^"]+)"' # HTTP method and path + r'\s+(\d+)' # Status code + r'\s+(\d+)' # Response size + r'\s+"([^"]*)"' # Referrer + r'\s+"([^"]*)"' # User agent + r'\s+(\S+)' # Cache status (hit/miss) +) + + +def safe_int(value, default=None): + """Safely convert value to int, return default if fails.""" + try: + return int(value) if value else default + except (ValueError, TypeError): + return default + +def safe_get(groups, index, default=None): + """Safely get group from regex match, return default if out of bounds.""" + try: + return groups[index] if index < len(groups) and groups[index] else default + except (IndexError, TypeError): + return default + +def parse_log_line(line: str) -> Optional[Dict]: + """ + Parse a single log line and return structured data. + Truly lazy parsing - extracts whatever fields are available using individual patterns. + Doesn't rely on a fixed format - works with any log structure. + + Args: + line: Raw log line string + + Returns: + Dictionary with parsed fields or None if line is empty + """ + line = line.strip() + if not line: + return None + + # Start with raw line + result = {'raw_line': line} + + # Try LOG_PATTERN first as an optimization (faster for standard format) + match = LOG_PATTERN.match(line) + if match: + groups = match.groups() + result['priority'] = safe_int(safe_get(groups, 0)) + timestamp_str = safe_get(groups, 1) + if timestamp_str: + try: + timestamp = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%SZ') + result['timestamp'] = timestamp.isoformat() + except (ValueError, TypeError): + result['timestamp'] = None + result['cache_server'] = safe_get(groups, 2) + result['process'] = safe_get(groups, 3) + result['pid'] = safe_int(safe_get(groups, 4)) + result['ip_address'] = safe_get(groups, 5) + result['referrer1'] = safe_get(groups, 6) + result['referrer2'] = safe_get(groups, 7) + result['date_string'] = safe_get(groups, 8) + method = safe_get(groups, 9) + full_path = safe_get(groups, 10) + if full_path: + path_parts = full_path.split('?', 1) + result['path'] = path_parts[0] + result['query_string'] = path_parts[1] if len(path_parts) > 1 else None + query_params = {} + if result['query_string']: + for param in result['query_string'].split('&'): + if '=' in param: + key, value = param.split('=', 1) + query_params[key] = value + result['query_params'] = query_params + result['http_method'] = method + result['status_code'] = safe_int(safe_get(groups, 11)) + result['response_size'] = safe_int(safe_get(groups, 12)) + result['referrer'] = safe_get(groups, 13) + result['user_agent'] = safe_get(groups, 14) + result['cache_status'] = safe_get(groups, 15) + return result + + # Fallback: Extract fields individually (lazy mode - works with any format) + # Extract timestamp (ISO format with Z) + timestamp_match = re.search(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)', line) + if timestamp_match: + try: + timestamp = datetime.strptime(timestamp_match.group(1), '%Y-%m-%dT%H:%M:%SZ') + result['timestamp'] = timestamp.isoformat() + except ValueError: + pass + + # Extract priority code + priority_match = re.search(r'<(\d+)>', line) + if priority_match: + result['priority'] = safe_int(priority_match.group(1)) + + # Extract IP address + ip_match = re.search(r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b', line) + if ip_match: + result['ip_address'] = ip_match.group(1) + + # Extract HTTP method and path + http_match = re.search(r'"([A-Z]+)\s+([^"]+)"', line) + if http_match: + result['http_method'] = http_match.group(1) + full_path = http_match.group(2) + path_parts = full_path.split('?', 1) + result['path'] = path_parts[0] + result['query_string'] = path_parts[1] if len(path_parts) > 1 else None + # Parse query parameters + if result.get('query_string'): + query_params = {} + for param in result['query_string'].split('&'): + if '=' in param: + key, value = param.split('=', 1) + query_params[key] = value + result['query_params'] = query_params + else: + result['query_params'] = {} + + # Extract status code (3-digit number) + status_match = re.search(r'\s(\d{3})\s', line) + if status_match: + result['status_code'] = safe_int(status_match.group(1)) + + # Extract response size (number after status code) + size_match = re.search(r'\s(\d{3})\s+(\d+)\s', line) + if size_match: + result['response_size'] = safe_int(size_match.group(2)) + + # Extract user agent (in quotes) + ua_match = re.search(r'"([^"]*Mozilla[^"]*)"', line) + if ua_match: + result['user_agent'] = ua_match.group(1) + else: + # Try any quoted string that looks like a user agent + ua_match = re.search(r'"([^"]{20,})"', line) + if ua_match and 'Mozilla' in ua_match.group(1): + result['user_agent'] = ua_match.group(1) + + # Extract cache status (hit/miss/etc) + cache_match = re.search(r'\s(hit|miss|pass|error|synth)\s*$', line) + if cache_match: + result['cache_status'] = cache_match.group(1) + + # Extract cache server (word before process) + server_match = re.search(r'cache-([^\s]+)', line) + if server_match: + result['cache_server'] = 'cache-' + server_match.group(1) + + # Extract process[pid] + process_match = re.search(r'(\S+)\[(\d+)\]:', line) + if process_match: + result['process'] = process_match.group(1) + result['pid'] = safe_int(process_match.group(2)) + + return result + + +def process_log_file(file_path: Path): + """ + Process a log file (compressed or uncompressed) and yield parsed entries lazily. + + Args: + file_path: Path to the log file + + Yields: + Parsed log entry dictionaries + """ + try: + if file_path.suffix == '.gz': + with gzip.open(file_path, 'rt', encoding='utf-8', errors='ignore') as f: + for line_num, line in enumerate(f, 1): + entry = parse_log_line(line) + if entry: + entry['source_file'] = str(file_path) + entry['line_number'] = line_num + yield entry + else: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + for line_num, line in enumerate(f, 1): + entry = parse_log_line(line) + if entry: + entry['source_file'] = str(file_path) + entry['line_number'] = line_num + yield entry + except Exception as e: + print(f"Error processing {file_path}: {e}", file=sys.stderr) + + +def save_json_streaming(entries, output_path: Path): + """Save parsed data as JSON using streaming (memory efficient).""" + with open(output_path, 'w') as f: + f.write('[\n') + first = True + for entry in entries: + if not first: + f.write(',\n') + json.dump(entry, f, indent=2) + first = False + f.write('\n]') + + +def save_csv_streaming(entries, output_path: Path): + """Save parsed data as CSV using streaming (memory efficient).""" + writer = None + first_entry = True + + with open(output_path, 'w', newline='') as f: + for entry in entries: + # Flatten query_params for CSV + flat_entry = entry.copy() + # Convert query_params dict to string for CSV + flat_entry['query_params'] = json.dumps(entry['query_params']) + + if first_entry: + # Initialize writer with fieldnames from first entry + fieldnames = flat_entry.keys() + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + first_entry = False + + writer.writerow(flat_entry) + + +def main(): + parser = argparse.ArgumentParser(description='Parse Fastly log files') + parser.add_argument( + '--input-dir', + type=str, + default='./logs', + help='Directory containing log files (default: ./logs)' + ) + parser.add_argument( + '--output', + type=str, + help='Output file path (default: parsed_logs.json)' + ) + parser.add_argument( + '--format', + choices=['json', 'csv'], + default='json', + help='Output format (default: json)' + ) + parser.add_argument( + '--pattern', + type=str, + default='*.log*', + help='File pattern to match (default: *.log*)' + ) + + args = parser.parse_args() + + input_dir = Path(args.input_dir) + if not input_dir.exists(): + print(f"Error: Input directory does not exist: {input_dir}", file=sys.stderr) + sys.exit(1) + + # Find all log files + log_files = [] + for pattern in [args.pattern, '*.log.gz', '*.log']: + log_files.extend(input_dir.glob(pattern)) + + # Remove duplicates + log_files = list(set(log_files)) + + if not log_files: + print(f"No log files found in {input_dir}", file=sys.stderr) + sys.exit(1) + + print(f"Found {len(log_files)} log file(s)") + print("Parsing logs (lazy/streaming mode - memory efficient)...") + + # Generator function that yields all entries from all files + total_count = 0 + def all_entries_generator(): + nonlocal total_count + for log_file in sorted(log_files): + print(f" Processing: {log_file.name}") + file_count = 0 + for entry in process_log_file(log_file): + file_count += 1 + total_count += 1 + yield entry + print(f" Parsed {file_count} entries") + + # Determine output path + if args.output: + output_path = Path(args.output) + else: + output_path = Path(args.input_dir) / f"parsed_logs.{args.format}" + + # Check if output file exists and warn + if output_path.exists(): + print(f"Warning: Output file already exists: {output_path}") + print(f" It will be OVERWRITTEN with new data.") + print(f" (Previous data will be lost)") + + # Save output using streaming (overwrites existing file) + print(f"Saving to {output_path}...") + if args.format == 'json': + save_json_streaming(all_entries_generator(), output_path) + else: + save_csv_streaming(all_entries_generator(), output_path) + + print(f"\nTotal entries parsed: {total_count}") + + print("Done!") + + +if __name__ == '__main__': + main() + diff --git a/src/sync/__init__.py b/src/sync/__init__.py new file mode 100644 index 0000000..a7070be --- /dev/null +++ b/src/sync/__init__.py @@ -0,0 +1,10 @@ +""" +Log synchronization modules +""" + +from .base import BaseSync +from .s3_sync import S3Sync +from .sync_manager import SyncManager + +__all__ = ['BaseSync', 'S3Sync', 'SyncManager'] + diff --git a/src/sync/base.py b/src/sync/base.py new file mode 100644 index 0000000..d7730f0 --- /dev/null +++ b/src/sync/base.py @@ -0,0 +1,48 @@ +""" +Base sync class/interface +""" + +from abc import ABC, abstractmethod +from typing import Tuple, Dict +from pathlib import Path + +class BaseSync(ABC): + """Base class for log synchronization.""" + + def __init__(self, source_name: str, source_config: Dict): + """ + Initialize sync instance. + + Args: + source_name: Name of the log source + source_config: Configuration dictionary for this source + """ + self.source_name = source_name + self.config = source_config + self.local_dir = Path(source_config.get('local_dir', f"logs/{source_name}/raw")) + + @abstractmethod + def sync(self, start_date: str, end_date: str, max_workers: int = 10) -> Tuple[int, int, int]: + """ + Sync logs for the specified date range. + + Args: + start_date: Start date in YYYY-MM-DD format + end_date: End date in YYYY-MM-DD format + max_workers: Number of concurrent workers + + Returns: + Tuple of (downloads, skips, errors) + """ + pass + + @abstractmethod + def test_connection(self) -> bool: + """ + Test connection to the log source. + + Returns: + True if connection is successful + """ + pass + diff --git a/src/sync/s3_sync.py b/src/sync/s3_sync.py new file mode 100644 index 0000000..7a788fb --- /dev/null +++ b/src/sync/s3_sync.py @@ -0,0 +1,233 @@ +""" +S3-specific sync implementation +""" + +import os +from pathlib import Path +from datetime import datetime, timedelta +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import List, Tuple, Optional +import boto3 +from botocore.exceptions import ClientError, BotoCoreError + +from .base import BaseSync + +# Colors for output +class Colors: + RED = '\033[0;31m' + GREEN = '\033[0;32m' + YELLOW = '\033[1;33m' + BLUE = '\033[0;34m' + NC = '\033[0m' # No Color + +class S3Sync(BaseSync): + """S3 log synchronization implementation.""" + + def __init__(self, source_name: str, source_config: dict): + super().__init__(source_name, source_config) + self.s3_bucket = source_config.get('s3_bucket') + self.bucket_name, self.bucket_path = self._parse_s3_uri(self.s3_bucket) + self.s3_client = None + + def _parse_s3_uri(self, s3_uri: str) -> Tuple[str, str]: + """Parse S3 URI into bucket name and path.""" + s3_uri = s3_uri.replace("s3://", "") + parts = s3_uri.split("/", 1) + bucket_name = parts[0] + bucket_path = parts[1] if len(parts) > 1 else "" + if bucket_path and not bucket_path.endswith("/"): + bucket_path += "/" + return bucket_name, bucket_path + + def _create_s3_client(self): + """Create S3 client with optional profile.""" + profile = self.config.get('credentials', {}).get('profile') + + if profile: + session = boto3.Session(profile_name=profile) + return session.client('s3') + else: + return boto3.client('s3') + + def test_connection(self) -> bool: + """Test connection to S3 bucket.""" + try: + if self.s3_client is None: + self.s3_client = self._create_s3_client() + self.s3_client.head_bucket(Bucket=self.bucket_name) + return True + except (ClientError, BotoCoreError): + return False + + def _list_s3_files(self, prefix: str) -> List[str]: + """List all files in S3 matching the prefix pattern.""" + files = [] + paginator = self.s3_client.get_paginator('list_objects_v2') + + try: + for page in paginator.paginate(Bucket=self.bucket_name, Prefix=prefix): + if 'Contents' in page: + for obj in page['Contents']: + key = obj['Key'] + # Filter for log files + if key.endswith('.log.gz') or key.endswith('.log'): + files.append(key) + except ClientError as e: + print(f"{Colors.RED}Error listing files: {e}{Colors.NC}") + return [] + + return files + + def _get_s3_object_size(self, key: str) -> Optional[int]: + """Get the size of an S3 object.""" + try: + response = self.s3_client.head_object(Bucket=self.bucket_name, Key=key) + return response.get('ContentLength', 0) + except ClientError: + return None + + def _download_file(self, s3_key: str, local_file: Path, + file_num: int, total_files: int) -> Tuple[bool, str]: + """Download a single file from S3.""" + filename = local_file.name + + # Check if file exists and compare size + needs_download = True + if local_file.exists(): + s3_size = self._get_s3_object_size(s3_key) + if s3_size is not None: + local_size = local_file.stat().st_size + if s3_size == local_size and s3_size > 0: + needs_download = False + + if not needs_download: + return (False, f"{Colors.YELLOW} ⊘ [{file_num}/{total_files}] Skipped (up to date): {filename}{Colors.NC}") + + # Download the file + try: + self.s3_client.download_file(self.bucket_name, s3_key, str(local_file)) + return (True, f"{Colors.GREEN} ✓ [{file_num}/{total_files}] Downloaded: {filename}{Colors.NC}") + except ClientError as e: + return (False, f"{Colors.RED} ✗ [{file_num}/{total_files}] Failed: {filename} - {e}{Colors.NC}") + + def _sync_date(self, date_str: str, max_workers: int = 10) -> Tuple[int, int, int]: + """Sync all files for a specific date.""" + date_prefix = date_str + prefix = f"{self.bucket_path}{date_prefix}" + + print(f"{Colors.YELLOW}Syncing logs for date: {date_str}{Colors.NC}") + print(f"{Colors.BLUE} Listing files for {date_str} (prefix: {date_prefix})...{Colors.NC}") + + # List all files for this date + files = self._list_s3_files(prefix) + file_count = len(files) + + if file_count == 0: + print(f"{Colors.YELLOW} ⊘ No files found for this date{Colors.NC}") + return (0, 0, 0) + + print(f"{Colors.BLUE} Found {file_count} file(s) - syncing with {max_workers} concurrent workers...{Colors.NC}") + + # Directory should already exist from sync() method, but ensure it exists + self.local_dir.mkdir(parents=True, exist_ok=True) + + downloads = 0 + skips = 0 + errors = 0 + + # Download files concurrently + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all download tasks + future_to_file = {} + for idx, s3_key in enumerate(files, 1): + filename = os.path.basename(s3_key) + local_file = self.local_dir / filename + future = executor.submit(self._download_file, s3_key, local_file, idx, file_count) + future_to_file[future] = (idx, filename) + + # Process completed downloads and show progress + completed = 0 + for future in as_completed(future_to_file): + completed += 1 + file_num, filename = future_to_file[future] + try: + is_download, message = future.result() + print(f"\r{message}", end="", flush=True) + + if is_download: + downloads += 1 + else: + if "Skipped" in message: + skips += 1 + else: + errors += 1 + + # Show overall progress + print(f"\r{Colors.BLUE} Progress: {completed}/{file_count} files processed...{Colors.NC}", + end="", flush=True) + except Exception as e: + print(f"\r{Colors.RED} ✗ [{file_num}/{file_count}] Error: {filename} - {e}{Colors.NC}") + errors += 1 + + print() # New line after progress + + # Show summary + if downloads > 0: + print(f"{Colors.GREEN} ✓ Downloaded/updated {downloads} file(s){Colors.NC}") + if skips > 0: + print(f"{Colors.YELLOW} ⊘ Skipped {skips} file(s) (up to date){Colors.NC}") + + return (downloads, skips, errors) + + def sync(self, start_date: str, end_date: str, max_workers: int = 10) -> Tuple[int, int, int]: + """Sync logs for the specified date range.""" + # Create local directory if it doesn't exist (do this early) + self.local_dir.mkdir(parents=True, exist_ok=True) + + # Initialize S3 client + if self.s3_client is None: + try: + self.s3_client = self._create_s3_client() + # Test credentials + self.s3_client.head_bucket(Bucket=self.bucket_name) + except (ClientError, BotoCoreError) as e: + print(f"{Colors.RED}Error: AWS credentials not configured or invalid for source '{self.source_name}'{Colors.NC}") + print(f"Please configure AWS credentials: {e}") + return (0, 0, 1) + + print(f"\n{Colors.GREEN}{'='*60}{Colors.NC}") + print(f"{Colors.GREEN}Syncing source: {self.source_name}{Colors.NC}") + print(f"{Colors.BLUE}Description: {self.config.get('description', 'N/A')}{Colors.NC}") + print(f"{Colors.BLUE}S3 Bucket: {self.s3_bucket}{Colors.NC}") + print(f"{Colors.BLUE}Local Directory: {self.local_dir}{Colors.NC}") + print(f"{Colors.GREEN}{'='*60}{Colors.NC}\n") + + # Validate dates + start_dt = datetime.strptime(start_date, "%Y-%m-%d") + end_dt = datetime.strptime(end_date, "%Y-%m-%d") + + # Generate date range + current_date = start_dt + total_downloads = 0 + total_skips = 0 + total_errors = 0 + + while current_date <= end_dt: + date_str = current_date.strftime("%Y-%m-%d") + downloads, skips, errors = self._sync_date(date_str, max_workers) + + total_downloads += downloads + total_skips += skips + total_errors += errors + + # Move to next date + current_date += timedelta(days=1) + + print(f"\n{Colors.GREEN}Source '{self.source_name}' sync completed!{Colors.NC}") + print(f" New files downloaded: {total_downloads}") + print(f" Files skipped (already exist): {total_skips}") + if total_errors > 0: + print(f" {Colors.RED}Errors encountered: {total_errors}{Colors.NC}") + + return (total_downloads, total_skips, total_errors) + diff --git a/src/sync/sync_manager.py b/src/sync/sync_manager.py new file mode 100644 index 0000000..c193136 --- /dev/null +++ b/src/sync/sync_manager.py @@ -0,0 +1,71 @@ +""" +Sync manager - orchestrates multiple log sources +""" + +from typing import Dict, Tuple +from .base import BaseSync +from .s3_sync import S3Sync + +def create_sync_instance(source_name: str, source_config: Dict) -> BaseSync: + """ + Create appropriate sync instance based on source type. + + Args: + source_name: Name of the log source + source_config: Configuration dictionary for this source + + Returns: + BaseSync instance + """ + source_type = source_config.get('type', '').lower() + + if source_type == 's3': + return S3Sync(source_name, source_config) + else: + raise ValueError(f"Unsupported source type: {source_type}") + +class SyncManager: + """Manages synchronization of multiple log sources.""" + + def __init__(self, sources: Dict): + """ + Initialize sync manager. + + Args: + sources: Dictionary of source configurations + """ + self.sources = sources + self.sync_instances = {} + + def get_sync_instance(self, source_name: str) -> BaseSync: + """Get or create sync instance for a source.""" + if source_name not in self.sync_instances: + if source_name not in self.sources: + raise ValueError(f"Source '{source_name}' not found in configuration") + self.sync_instances[source_name] = create_sync_instance( + source_name, self.sources[source_name] + ) + return self.sync_instances[source_name] + + def sync_source(self, source_name: str, start_date: str, end_date: str, + max_workers: int = 10) -> Tuple[int, int, int]: + """Sync a specific source.""" + sync_instance = self.get_sync_instance(source_name) + return sync_instance.sync(start_date, end_date, max_workers) + + def sync_all(self, start_date: str, end_date: str, + max_workers: int = 10) -> Dict[str, Tuple[int, int, int]]: + """ + Sync all enabled sources. + + Returns: + Dictionary mapping source names to (downloads, skips, errors) tuples + """ + results = {} + for source_name in self.sources: + if self.sources[source_name].get('enabled', False): + results[source_name] = self.sync_source( + source_name, start_date, end_date, max_workers + ) + return results + diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..82f02f1 --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,9 @@ +""" +Utility functions for log processing +""" + +from .config_loader import load_config, get_enabled_sources +from .date_utils import validate_date, parse_date_range + +__all__ = ['load_config', 'get_enabled_sources', 'validate_date', 'parse_date_range'] + diff --git a/src/utils/config_loader.py b/src/utils/config_loader.py new file mode 100644 index 0000000..97833f3 --- /dev/null +++ b/src/utils/config_loader.py @@ -0,0 +1,51 @@ +""" +Configuration loading utilities +""" + +import sys +import yaml +from pathlib import Path +from typing import Dict + +# Colors for output +class Colors: + RED = '\033[0;31m' + GREEN = '\033[0;32m' + YELLOW = '\033[1;33m' + BLUE = '\033[0;34m' + NC = '\033[0m' # No Color + +# Default config path +DEFAULT_CONFIG_PATH = Path(__file__).parent.parent.parent / "config" / "log_sources.yaml" + +def load_config(config_path: Path = None) -> Dict: + """Load log sources configuration from YAML file.""" + if config_path is None: + config_path = DEFAULT_CONFIG_PATH + + if not config_path.exists(): + print(f"{Colors.RED}Error: Configuration file not found: {config_path}{Colors.NC}", file=sys.stderr) + print(f"Please create {config_path} with log source definitions.", file=sys.stderr) + sys.exit(1) + + try: + with open(config_path, 'r') as f: + config = yaml.safe_load(f) + + if not config or 'log_sources' not in config: + print(f"{Colors.RED}Error: Invalid configuration file format{Colors.NC}", file=sys.stderr) + sys.exit(1) + + return config['log_sources'] + except yaml.YAMLError as e: + print(f"{Colors.RED}Error: Failed to parse configuration file: {e}{Colors.NC}", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"{Colors.RED}Error: Failed to load configuration: {e}{Colors.NC}", file=sys.stderr) + sys.exit(1) + +def get_enabled_sources(sources: Dict) -> Dict: + """Get all enabled log sources.""" + return {name: config for name, config in sources.items() + if config.get('enabled', False)} + diff --git a/src/utils/date_utils.py b/src/utils/date_utils.py new file mode 100644 index 0000000..ab30a53 --- /dev/null +++ b/src/utils/date_utils.py @@ -0,0 +1,61 @@ +""" +Date and time utility functions +""" + +import sys +from datetime import datetime, timedelta +from typing import Tuple, Optional + +# Colors for output +class Colors: + RED = '\033[0;31m' + NC = '\033[0m' # No Color + +def validate_date(date_str: str) -> None: + """Validate date format.""" + try: + datetime.strptime(date_str, "%Y-%m-%d") + except ValueError: + print(f"{Colors.RED}Error: Invalid date format: {date_str}{Colors.NC}", file=sys.stderr) + print("Date must be in YYYY-MM-DD format", file=sys.stderr) + sys.exit(1) + +def parse_date_range(start_date: Optional[str] = None, + end_date: Optional[str] = None, + single_date: Optional[str] = None) -> Tuple[str, str]: + """ + Parse date range from arguments. + + Args: + start_date: Start date in YYYY-MM-DD format + end_date: End date in YYYY-MM-DD format + single_date: Single date (will sync from this date to today) + + Returns: + Tuple of (start_date, end_date) as strings + """ + if single_date: + start_date = single_date + # Set end date to today in UTC + try: + from datetime import timezone + end_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") + except ImportError: + end_date = datetime.utcnow().strftime("%Y-%m-%d") + + if not start_date or not end_date: + raise ValueError("Start date and end date are required") + + # Validate dates + validate_date(start_date) + validate_date(end_date) + + # Validate date range + start_dt = datetime.strptime(start_date, "%Y-%m-%d") + end_dt = datetime.strptime(end_date, "%Y-%m-%d") + if start_dt > end_dt: + print(f"{Colors.RED}Error: Start date must be before or equal to end date{Colors.NC}", file=sys.stderr) + sys.exit(1) + + return (start_date, end_date) +