From eea5fe8a8d771c8a7b1f05aa221df81bfb39ecb1 Mon Sep 17 00:00:00 2001 From: John Ajera Date: Thu, 27 Nov 2025 17:14:45 +1300 Subject: [PATCH] feat: enhance scripts updates scripts --- dashboard/dashboard.py | 78 +++++++-- scripts/analyze_logs.py | 336 ++++++++++++++++++++++++++++++++++++--- scripts/parse_logs.py | 94 ++++++++--- scripts/query_logs.py | 6 +- scripts/sync_logs.py | 6 +- src/analyze/analytics.py | 88 ++++++++++ src/sync/s3_sync.py | 47 ++++-- src/utils/date_utils.py | 58 +++++-- 8 files changed, 628 insertions(+), 85 deletions(-) diff --git a/dashboard/dashboard.py b/dashboard/dashboard.py index 52578b2..157091c 100644 --- a/dashboard/dashboard.py +++ b/dashboard/dashboard.py @@ -524,33 +524,83 @@ def main(): if fig: st.plotly_chart(fig, width='stretch') - # Top client IPs + # Top 10 Client IPs if analytics['slowness'].get('top_request_ips'): - st.subheader("Top Client IPs by Request Volume") + st.subheader("Top 10 Client IPs by Request Volume") st.markdown("**Identifying clients generating the most traffic can help detect bots, crawlers, or potential abuse.**") - + + top_ips = analytics['slowness']['top_request_ips'] + # Get top 10 + top_10_ips = dict(sorted(top_ips.items(), key=lambda x: x[1], reverse=True)[:10]) + col1, col2 = st.columns([2, 1]) - + with col1: - top_ips = analytics['slowness']['top_request_ips'] fig = create_bar_chart( - top_ips, - "Top Client IPs", + top_10_ips, + "Top 10 Client IPs", "IP Address", "Request Count", - limit=15 + limit=10 ) if fig: st.plotly_chart(fig, width='stretch') - + with col2: - # Table with top IPs - df_ips = pd.DataFrame({ - 'IP Address': list(top_ips.keys())[:20], - 'Requests': list(top_ips.values())[:20] - }) + # Table with top 10 IPs - include user agent if available + if analytics['slowness'].get('top_request_ips_with_ua'): + ip_ua_data = analytics['slowness']['top_request_ips_with_ua'] + df_ips = pd.DataFrame([ + { + 'IP Address': ip, + 'Requests': data['request_count'], + 'Top User Agent': data['top_user_agent'][:80] if len(data['top_user_agent']) > 80 else data['top_user_agent'] # Truncate long UAs + } + for ip, data in ip_ua_data.items() + ]) + else: + # Fallback if user agent data not available + df_ips = pd.DataFrame({ + 'IP Address': list(top_10_ips.keys()), + 'Requests': list(top_10_ips.values()) + }) st.dataframe(df_ips, width='stretch', hide_index=True) + # Top 10 IPs by Request Rate (requests per minute) + if analytics['slowness'].get('top_ips_by_request_rate'): + st.subheader("Top 10 Client IPs by Request Rate (Requests per Minute)") + st.markdown("**IPs with high request rates may indicate automated traffic, bots, or potential abuse.**") + + ip_rates = analytics['slowness']['top_ips_by_request_rate'] + + col1, col2 = st.columns([2, 1]) + + with col1: + # Create chart with requests per minute + rate_data = {ip: data['requests_per_minute'] for ip, data in ip_rates.items()} + fig = create_bar_chart( + rate_data, + "Top 10 IPs by Request Rate", + "IP Address", + "Requests per Minute", + limit=10 + ) + if fig: + st.plotly_chart(fig, width='stretch') + + with col2: + # Table with detailed metrics + df_rates = pd.DataFrame([ + { + 'IP Address': ip, + 'Req/min': f"{data['requests_per_minute']:.2f}", + 'Total Requests': int(data['total_requests']), + 'Time Span (min)': f"{data['time_span_minutes']:.1f}" + } + for ip, data in ip_rates.items() + ]) + st.dataframe(df_rates, width='stretch', hide_index=True) + if __name__ == "__main__": main() diff --git a/scripts/analyze_logs.py b/scripts/analyze_logs.py index 6307659..37ddbca 100755 --- a/scripts/analyze_logs.py +++ b/scripts/analyze_logs.py @@ -8,6 +8,7 @@ import csv import argparse import sys +import random from pathlib import Path from collections import Counter, defaultdict from datetime import datetime, timedelta, timezone @@ -16,21 +17,149 @@ import numpy as np -def load_data(input_path: Path) -> List[Dict]: - """Load parsed log data from JSON or CSV file.""" +def load_data_chunked(input_path: Path, chunk_size: int = 100000): + """ + Load parsed log data in chunks (memory-efficient for large files). + + Args: + input_path: Path to input file + chunk_size: Number of entries to load per chunk + + Yields: + Chunks of log entries + """ if input_path.suffix == '.json': + # For JSON, we need to parse incrementally + # This is a simplified approach - for very large files, consider using ijson with open(input_path, 'r') as f: - return json.load(f) + data = json.load(f) + if not isinstance(data, list): + raise ValueError("JSON file must contain a list of entries") + + # Yield in chunks + for i in range(0, len(data), chunk_size): + yield data[i:i + chunk_size] + elif input_path.suffix == '.csv': + # For CSV, use pandas chunking + for chunk in pd.read_csv(input_path, chunksize=chunk_size): + # Convert query_params string back to dict if needed + if 'query_params' in chunk.columns: + chunk['query_params'] = chunk['query_params'].apply( + lambda x: json.loads(x) if isinstance(x, str) else {} + ) + yield chunk.to_dict('records') + else: + raise ValueError(f"Unsupported file format: {input_path.suffix}") + + +def load_data(input_path: Path, use_chunked: bool = False) -> List[Dict]: + """ + Load parsed log data from JSON or CSV file. + + Args: + input_path: Path to input file + sample_size: If specified, randomly sample this many entries + sample_percent: If specified, randomly sample this percentage of entries (0.0-1.0) + + Returns: + List of log entries + """ + if use_chunked: + # Use chunked processing - collect all chunks + all_entries = [] + for chunk in load_data_chunked(input_path): + all_entries.extend(chunk) + entries = all_entries + elif input_path.suffix == '.json': + # For large JSON files, check size and use chunked if needed + file_size = input_path.stat().st_size + file_size_mb = file_size / (1024 * 1024) + + if file_size_mb > 500: # If file is > 500MB, use chunked loading + print(f"Large file detected ({file_size_mb:.1f} MB). Using chunked processing...", file=sys.stderr) + all_entries = [] + for chunk in load_data_chunked(input_path): + all_entries.extend(chunk) + entries = all_entries + else: + with open(input_path, 'r') as f: + entries = json.load(f) elif input_path.suffix == '.csv': - df = pd.read_csv(input_path) + # For CSV, use pandas with chunking for large files + file_size = input_path.stat().st_size + file_size_mb = file_size / (1024 * 1024) + + if file_size_mb > 500: + print(f"Warning: Large file detected ({file_size_mb:.1f} MB). Loading in chunks...", file=sys.stderr) + # Read in chunks and combine + chunks = [] + for chunk in pd.read_csv(input_path, chunksize=100000): + chunks.append(chunk) + df = pd.concat(chunks, ignore_index=True) + else: + df = pd.read_csv(input_path) + # Convert query_params string back to dict if needed if 'query_params' in df.columns: df['query_params'] = df['query_params'].apply( lambda x: json.loads(x) if isinstance(x, str) else {} ) - return df.to_dict('records') + entries = df.to_dict('records') else: raise ValueError(f"Unsupported file format: {input_path.suffix}") + + return entries + + +def filter_by_time(entries: List[Dict], last_hours: float) -> List[Dict]: + """ + Filter entries to only include those from the last N hours. + + Args: + entries: List of log entries + last_hours: Number of hours to look back (e.g., 1.0 for last hour) + + Returns: + Filtered list of entries + """ + if not entries or last_hours is None: + return entries + + # Get current time in UTC + now = datetime.now(timezone.utc) + cutoff_time = now - timedelta(hours=last_hours) + + filtered = [] + for entry in entries: + timestamp_str = entry.get('timestamp') + if not timestamp_str: + continue + + try: + # Parse timestamp (handle both ISO format and other formats) + if isinstance(timestamp_str, str): + # Try ISO format first + try: + entry_time = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) + except ValueError: + # Try other common formats + try: + entry_time = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S') + # Assume UTC if no timezone info + entry_time = entry_time.replace(tzinfo=timezone.utc) + except ValueError: + continue + else: + continue + + # Filter entries within the time window + if entry_time >= cutoff_time: + filtered.append(entry) + except (ValueError, TypeError, AttributeError): + # Skip entries with invalid timestamps + continue + + return filtered def filter_by_time(entries: List[Dict], last_hours: float) -> List[Dict]: @@ -364,7 +493,8 @@ def analyze_slowness_patterns(entries: List[Dict]) -> Dict: if len(df_with_cache) > 0: df_with_cache['hour'] = df_with_cache['timestamp'].dt.hour cache_miss_by_hour = df_with_cache.groupby('hour').apply( - lambda x: (x['cache_status'] == 'miss').sum() / len(x) * 100 + lambda x: (x['cache_status'] == 'miss').sum() / len(x) * 100, + include_groups=False ).to_dict() results['cache_miss_rate_by_hour'] = {int(k): float(v) for k, v in cache_miss_by_hour.items()} @@ -400,7 +530,8 @@ def analyze_slowness_patterns(entries: List[Dict]) -> Dict: if len(df_with_status) > 0: # Endpoints with high error rates (might be slow/failing) error_rates = df_with_status.groupby('path').apply( - lambda x: (x['status_code'] >= 400).sum() / len(x) * 100 + lambda x: (x['status_code'] >= 400).sum() / len(x) * 100, + include_groups=False ).sort_values(ascending=False).head(20).to_dict() results['high_error_rate_endpoints'] = {k: float(v) for k, v in error_rates.items()} @@ -453,6 +584,94 @@ def analyze_slowness_patterns(entries: List[Dict]) -> Dict: # Top IPs by request volume (might indicate bots/crawlers causing load) top_ips = df_with_ip['ip_address'].value_counts().head(20).to_dict() results['top_request_ips'] = {k: int(v) for k, v in top_ips.items()} + + # Get user agent info for top IPs + if 'user_agent' in df_with_ip.columns: + top_ips_with_ua = {} + for ip in list(top_ips.keys())[:10]: # Top 10 IPs + ip_df = df_with_ip[df_with_ip['ip_address'] == ip] + if 'user_agent' in ip_df.columns: + # Get most common user agent for this IP + ua_counts = ip_df['user_agent'].dropna().value_counts() + if len(ua_counts) > 0: + top_ua = ua_counts.index[0] + top_ua_count = int(ua_counts.iloc[0]) + total_for_ip = len(ip_df) + ua_percentage = (top_ua_count / total_for_ip * 100) if total_for_ip > 0 else 0 + + # If multiple user agents, show count + unique_ua_count = len(ua_counts) + if unique_ua_count > 1: + top_ua_display = f"{top_ua} ({unique_ua_count} unique UAs)" + else: + top_ua_display = top_ua + + top_ips_with_ua[ip] = { + 'request_count': int(top_ips[ip]), + 'top_user_agent': top_ua_display, + 'top_ua_count': top_ua_count, + 'top_ua_percentage': float(ua_percentage), + 'unique_ua_count': int(unique_ua_count) + } + else: + top_ips_with_ua[ip] = { + 'request_count': int(top_ips[ip]), + 'top_user_agent': 'Unknown', + 'top_ua_count': 0, + 'top_ua_percentage': 0.0, + 'unique_ua_count': 0 + } + else: + top_ips_with_ua[ip] = { + 'request_count': int(top_ips[ip]), + 'top_user_agent': 'N/A', + 'top_ua_count': 0, + 'top_ua_percentage': 0.0, + 'unique_ua_count': 0 + } + results['top_request_ips_with_ua'] = top_ips_with_ua + + # Requests per minute by IP (rate-based analysis) + if 'timestamp' in df_with_ip.columns: + df_with_ip['timestamp'] = pd.to_datetime(df_with_ip['timestamp'], errors='coerce') + df_with_ip = df_with_ip[df_with_ip['timestamp'].notna()] + if len(df_with_ip) > 0: + ip_rates = {} + for ip in df_with_ip['ip_address'].unique(): + ip_df = df_with_ip[df_with_ip['ip_address'] == ip] + if len(ip_df) > 1: + # Calculate time span + min_time = ip_df['timestamp'].min() + max_time = ip_df['timestamp'].max() + time_span = (max_time - min_time).total_seconds() / 60.0 # Convert to minutes + + # Calculate requests per minute + if time_span > 0: + requests_per_min = len(ip_df) / time_span + else: + # If all requests are at the same time, use 1 minute as minimum + requests_per_min = len(ip_df) / 1.0 + + ip_rates[ip] = { + 'requests_per_minute': float(requests_per_min), + 'total_requests': int(len(ip_df)), + 'time_span_minutes': float(time_span) if time_span > 0 else 1.0 + } + else: + # Single request - assume 1 minute span + ip_rates[ip] = { + 'requests_per_minute': float(len(ip_df)), + 'total_requests': int(len(ip_df)), + 'time_span_minutes': 1.0 + } + + # Sort by requests per minute and get top 10 + top_ips_by_rate = dict(sorted( + ip_rates.items(), + key=lambda x: x[1]['requests_per_minute'], + reverse=True + )[:10]) + results['top_ips_by_request_rate'] = top_ips_by_rate # 7. User agent patterns (certain clients might be slower) if 'user_agent' in df.columns and 'response_size' in df.columns: @@ -609,6 +828,15 @@ def generate_report(analytics: Dict, output_format: str, output_path: Optional[P print("\n### Top Request IPs (might indicate bots/crawlers)") for ip, count in list(slow['top_request_ips'].items())[:10]: print(f" {ip}: {count:,} requests") + + # Top IPs by request rate + if 'top_ips_by_request_rate' in slow: + print("\n### Top IPs by Request Rate (requests per minute)") + for ip, data in list(slow['top_ips_by_request_rate'].items())[:10]: + rate = data['requests_per_minute'] + total = data['total_requests'] + time_span = data['time_span_minutes'] + print(f" {ip}: {rate:.2f} req/min ({total:,} total requests over {time_span:.1f} min)") print("\n" + "="*80) @@ -653,9 +881,71 @@ def main(): print(f"Error: Input file does not exist: {input_path}", file=sys.stderr) sys.exit(1) - print(f"Loading data from {input_path}...") - entries = load_data(input_path) - print(f"Loaded {len(entries):,} log entries") + # Check file size to determine processing strategy + file_size = input_path.stat().st_size + file_size_mb = file_size / (1024 * 1024) + use_chunked = file_size_mb > 200 # Use chunked processing for files > 200MB + + if use_chunked: + print(f"Large file detected ({file_size_mb:.1f} MB). Processing in chunks (memory-efficient)...") + + # Process in chunks and accumulate results + all_entries = [] + chunk_count = 0 + total_entries = 0 + + try: + for chunk in load_data_chunked(input_path, chunk_size=50000): + chunk_count += 1 + total_entries += len(chunk) + + # Apply time filter if specified + if args.last_hours: + chunk = filter_by_time(chunk, args.last_hours) + + if chunk: + all_entries.extend(chunk) + + # Progress indicator + if chunk_count % 10 == 0: + print(f" Processed {chunk_count} chunks ({total_entries:,} entries so far)...", end='\r', file=sys.stderr) + + print(f"\nLoaded {len(all_entries):,} log entries from {total_entries:,} total", file=sys.stderr) + entries = all_entries + + except MemoryError: + print(f"\nError: Out of memory. File is too large to process.", file=sys.stderr) + print(f"Consider using --last-hours to filter to recent entries only.", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"\nError loading data: {e}", file=sys.stderr) + import traceback + traceback.print_exc() + sys.exit(1) + else: + print(f"Loading data from {input_path}...") + try: + entries = load_data(input_path, use_chunked=False) + print(f"Loaded {len(entries):,} log entries") + except MemoryError: + print(f"Error: Out of memory loading file.", file=sys.stderr) + print(f"Try using --last-hours to filter to recent entries only.", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"Error loading data: {e}", file=sys.stderr) + import traceback + traceback.print_exc() + sys.exit(1) + + # Apply time filter if specified + if args.last_hours: + print(f"Filtering to last {args.last_hours} hour(s)...") + entries = filter_by_time(entries, args.last_hours) + print(f"Filtered to {len(entries):,} log entries") + + if not entries: + print(f"Error: No entries to analyze after filtering", file=sys.stderr) + sys.exit(1) # Apply time filter if specified if args.last_hours: @@ -664,14 +954,24 @@ def main(): print(f"Filtered to {len(entries):,} log entries") print("Generating analytics...") - analytics = { - 'traffic': analyze_traffic_patterns(entries), - 'errors': analyze_errors(entries), - 'performance': analyze_performance(entries), - 'user_agents': analyze_user_agents(entries), - 'query_patterns': analyze_query_patterns(entries), - 'slowness_investigation': analyze_slowness_patterns(entries) - } + try: + analytics = { + 'traffic': analyze_traffic_patterns(entries), + 'errors': analyze_errors(entries), + 'performance': analyze_performance(entries), + 'user_agents': analyze_user_agents(entries), + 'query_patterns': analyze_query_patterns(entries), + 'slowness_investigation': analyze_slowness_patterns(entries) + } + except MemoryError: + print(f"Error: Out of memory during analysis.", file=sys.stderr) + print(f"Try using --last-hours to filter to recent entries only.", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"Error during analysis: {e}", file=sys.stderr) + import traceback + traceback.print_exc() + sys.exit(1) output_path = Path(args.output) if args.output else None generate_report(analytics, args.format, output_path) diff --git a/scripts/parse_logs.py b/scripts/parse_logs.py index 5e97e7e..c1e4ec1 100755 --- a/scripts/parse_logs.py +++ b/scripts/parse_logs.py @@ -302,42 +302,92 @@ def main(): print(f"No log files found in {input_dir}", file=sys.stderr) sys.exit(1) - print(f"Found {len(log_files)} log file(s)") - print("Parsing logs (lazy/streaming mode - memory efficient)...") + # Determine output path + if args.output: + output_path = Path(args.output) + else: + output_path = Path(args.input_dir) / f"parsed_logs.{args.format}" - # Generator function that yields all entries from all files - total_count = 0 - def all_entries_generator(): - nonlocal total_count + # Check for incremental parsing - only process files newer than output + existing_entries = [] + files_to_process = [] + output_mtime = output_path.stat().st_mtime if output_path.exists() else 0 + + if output_path.exists(): + print(f"Found existing parsed file: {output_path}") + print(f"Checking which files need to be (re)parsed...") + + # Load existing entries if we're doing incremental parsing + try: + if args.format == 'json': + with open(output_path, 'r') as f: + existing_entries = json.load(f) + print(f" Loaded {len(existing_entries):,} existing entries") + # For CSV, we'd need to load it differently, but JSON is the default + except Exception as e: + print(f" Warning: Could not load existing file: {e}") + print(f" Will reparse all files") + existing_entries = [] + + # Check which files need processing for log_file in sorted(log_files): + file_mtime = log_file.stat().st_mtime + if file_mtime > output_mtime: + files_to_process.append(log_file) + else: + print(f" Skipping {log_file.name} (already parsed)") + else: + # No existing file, process all + files_to_process = log_files + print(f"Found {len(log_files)} log file(s) to parse") + + if not files_to_process: + print(f"All files already parsed. No new files to process.") + print(f"Total entries: {len(existing_entries):,}") + return + + print(f"Parsing {len(files_to_process)} new/changed file(s)...") + print("Parsing logs (lazy/streaming mode - memory efficient)...") + + # Generator function that yields entries from new files + new_count = 0 + def new_entries_generator(): + nonlocal new_count + for log_file in sorted(files_to_process): print(f" Processing: {log_file.name}") file_count = 0 for entry in process_log_file(log_file): file_count += 1 - total_count += 1 + new_count += 1 yield entry print(f" Parsed {file_count} entries") - # Determine output path - if args.output: - output_path = Path(args.output) + # Collect new entries + new_entries = list(new_entries_generator()) + + # Merge with existing entries + if existing_entries: + print(f"Merging {len(new_entries):,} new entries with {len(existing_entries):,} existing entries...") + all_entries = existing_entries + new_entries else: - output_path = Path(args.input_dir) / f"parsed_logs.{args.format}" + all_entries = new_entries - # Check if output file exists and warn - if output_path.exists(): - print(f"Warning: Output file already exists: {output_path}") - print(f" It will be OVERWRITTEN with new data.") - print(f" (Previous data will be lost)") - - # Save output using streaming (overwrites existing file) - print(f"Saving to {output_path}...") + # Ensure output directory exists + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Save merged output + print(f"Saving {len(all_entries):,} total entries to {output_path}...") if args.format == 'json': - save_json_streaming(all_entries_generator(), output_path) + with open(output_path, 'w') as f: + json.dump(all_entries, f, indent=2) else: - save_csv_streaming(all_entries_generator(), output_path) + save_csv_streaming(all_entries, output_path) - print(f"\nTotal entries parsed: {total_count}") + print(f"\nTotal entries parsed: {len(all_entries):,}") + if new_entries: + print(f" New entries: {len(new_entries):,}") + if existing_entries: + print(f" Existing entries: {len(existing_entries):,}") print("Done!") diff --git a/scripts/query_logs.py b/scripts/query_logs.py index 59b9be6..ff242b9 100755 --- a/scripts/query_logs.py +++ b/scripts/query_logs.py @@ -167,9 +167,9 @@ def main(): %(prog)s --operation analyze --parsed-output ./logs/srv_quakesearch-fastly/parsed/parsed_logs.json """ ) - parser.add_argument("--start-date", type=str, help="Start date in YYYY-MM-DD format") - parser.add_argument("--end-date", type=str, help="End date in YYYY-MM-DD format") - parser.add_argument("--date", type=str, help="Start date in YYYY-MM-DD format (syncs from this date to today)") + parser.add_argument("--start-date", type=str, help="Start date in YYYY-MM-DD or YYYY-MM-DDTHH format (e.g., 2025-11-25 or 2025-11-25T00)") + parser.add_argument("--end-date", type=str, help="End date in YYYY-MM-DD or YYYY-MM-DDTHH format (e.g., 2025-11-25 or 2025-11-25T23)") + parser.add_argument("--date", type=str, help="Start date in YYYY-MM-DD or YYYY-MM-DDTHH format (syncs from this date to today)") parser.add_argument( "--operation", choices=["sync", "parse", "analyze", "all"], diff --git a/scripts/sync_logs.py b/scripts/sync_logs.py index dbcfa9f..273a9ae 100755 --- a/scripts/sync_logs.py +++ b/scripts/sync_logs.py @@ -43,9 +43,9 @@ def main(): %(prog)s --config custom_config.yaml --source srv_quakesearch-fastly --date 2025-11-10 """ ) - parser.add_argument("--start-date", type=str, help="Start date in YYYY-MM-DD format") - parser.add_argument("--end-date", type=str, help="End date in YYYY-MM-DD format") - parser.add_argument("--date", type=str, help="Start date in YYYY-MM-DD format (syncs from this date to today)") + parser.add_argument("--start-date", type=str, help="Start date in YYYY-MM-DD or YYYY-MM-DDTHH format (e.g., 2025-11-25 or 2025-11-25T00)") + parser.add_argument("--end-date", type=str, help="End date in YYYY-MM-DD or YYYY-MM-DDTHH format (e.g., 2025-11-25 or 2025-11-25T23)") + parser.add_argument("--date", type=str, help="Start date in YYYY-MM-DD or YYYY-MM-DDTHH format (syncs from this date to today)") parser.add_argument("--source", type=str, help="Specific log source to sync (from config)") parser.add_argument("--all-sources", action="store_true", help="Sync all enabled sources") parser.add_argument("--list-sources", action="store_true", help="List all available log sources") diff --git a/src/analyze/analytics.py b/src/analyze/analytics.py index e19af59..6d2859f 100755 --- a/src/analyze/analytics.py +++ b/src/analyze/analytics.py @@ -536,6 +536,94 @@ def analyze_slowness_patterns(entries: List[Dict]) -> Dict: # Top IPs by request volume (might indicate bots/crawlers causing load) top_ips = df_with_ip['ip_address'].value_counts().head(20).to_dict() results['top_request_ips'] = {k: int(v) for k, v in top_ips.items()} + + # Get user agent info for top IPs + if 'user_agent' in df_with_ip.columns: + top_ips_with_ua = {} + for ip in list(top_ips.keys())[:10]: # Top 10 IPs + ip_df = df_with_ip[df_with_ip['ip_address'] == ip] + if 'user_agent' in ip_df.columns: + # Get most common user agent for this IP + ua_counts = ip_df['user_agent'].dropna().value_counts() + if len(ua_counts) > 0: + top_ua = ua_counts.index[0] + top_ua_count = int(ua_counts.iloc[0]) + total_for_ip = len(ip_df) + ua_percentage = (top_ua_count / total_for_ip * 100) if total_for_ip > 0 else 0 + + # If multiple user agents, show count + unique_ua_count = len(ua_counts) + if unique_ua_count > 1: + top_ua_display = f"{top_ua} ({unique_ua_count} unique UAs)" + else: + top_ua_display = top_ua + + top_ips_with_ua[ip] = { + 'request_count': int(top_ips[ip]), + 'top_user_agent': top_ua_display, + 'top_ua_count': top_ua_count, + 'top_ua_percentage': float(ua_percentage), + 'unique_ua_count': int(unique_ua_count) + } + else: + top_ips_with_ua[ip] = { + 'request_count': int(top_ips[ip]), + 'top_user_agent': 'Unknown', + 'top_ua_count': 0, + 'top_ua_percentage': 0.0, + 'unique_ua_count': 0 + } + else: + top_ips_with_ua[ip] = { + 'request_count': int(top_ips[ip]), + 'top_user_agent': 'N/A', + 'top_ua_count': 0, + 'top_ua_percentage': 0.0, + 'unique_ua_count': 0 + } + results['top_request_ips_with_ua'] = top_ips_with_ua + + # Requests per minute by IP (rate-based analysis) + if 'timestamp' in df_with_ip.columns: + df_with_ip['timestamp'] = pd.to_datetime(df_with_ip['timestamp'], errors='coerce') + df_with_ip = df_with_ip[df_with_ip['timestamp'].notna()] + if len(df_with_ip) > 0: + ip_rates = {} + for ip in df_with_ip['ip_address'].unique(): + ip_df = df_with_ip[df_with_ip['ip_address'] == ip] + if len(ip_df) > 1: + # Calculate time span + min_time = ip_df['timestamp'].min() + max_time = ip_df['timestamp'].max() + time_span = (max_time - min_time).total_seconds() / 60.0 # Convert to minutes + + # Calculate requests per minute + if time_span > 0: + requests_per_min = len(ip_df) / time_span + else: + # If all requests are at the same time, use 1 minute as minimum + requests_per_min = len(ip_df) / 1.0 + + ip_rates[ip] = { + 'requests_per_minute': float(requests_per_min), + 'total_requests': int(len(ip_df)), + 'time_span_minutes': float(time_span) if time_span > 0 else 1.0 + } + else: + # Single request - assume 1 minute span + ip_rates[ip] = { + 'requests_per_minute': float(len(ip_df)), + 'total_requests': int(len(ip_df)), + 'time_span_minutes': 1.0 + } + + # Sort by requests per minute and get top 10 + top_ips_by_rate = dict(sorted( + ip_rates.items(), + key=lambda x: x[1]['requests_per_minute'], + reverse=True + )[:10]) + results['top_ips_by_request_rate'] = top_ips_by_rate # 7. User agent patterns (certain clients might be slower) if 'user_agent' in df.columns and 'response_size' in df.columns: diff --git a/src/sync/s3_sync.py b/src/sync/s3_sync.py index 7a788fb..7671f9f 100644 --- a/src/sync/s3_sync.py +++ b/src/sync/s3_sync.py @@ -202,26 +202,45 @@ def sync(self, start_date: str, end_date: str, max_workers: int = 10) -> Tuple[i print(f"{Colors.BLUE}Local Directory: {self.local_dir}{Colors.NC}") print(f"{Colors.GREEN}{'='*60}{Colors.NC}\n") - # Validate dates - start_dt = datetime.strptime(start_date, "%Y-%m-%d") - end_dt = datetime.strptime(end_date, "%Y-%m-%d") - - # Generate date range - current_date = start_dt + # Parse dates - support both date and datetime formats + from src.utils.date_utils import parse_datetime + + start_dt = parse_datetime(start_date) + end_dt = parse_datetime(end_date) + + # Check if hour-level precision is used + has_hour_precision = 'T' in start_date or 'T' in end_date + + # Generate date/hour range + current_dt = start_dt total_downloads = 0 total_skips = 0 total_errors = 0 - while current_date <= end_dt: - date_str = current_date.strftime("%Y-%m-%d") - downloads, skips, errors = self._sync_date(date_str, max_workers) + if has_hour_precision: + # Iterate hour by hour + while current_dt <= end_dt: + date_str = current_dt.strftime("%Y-%m-%dT%H") + downloads, skips, errors = self._sync_date(date_str, max_workers) + + total_downloads += downloads + total_skips += skips + total_errors += errors + + # Move to next hour + current_dt += timedelta(hours=1) + else: + # Iterate day by day (original behavior) + while current_dt <= end_dt: + date_str = current_dt.strftime("%Y-%m-%d") + downloads, skips, errors = self._sync_date(date_str, max_workers) - total_downloads += downloads - total_skips += skips - total_errors += errors + total_downloads += downloads + total_skips += skips + total_errors += errors - # Move to next date - current_date += timedelta(days=1) + # Move to next date + current_dt += timedelta(days=1) print(f"\n{Colors.GREEN}Source '{self.source_name}' sync completed!{Colors.NC}") print(f" New files downloaded: {total_downloads}") diff --git a/src/utils/date_utils.py b/src/utils/date_utils.py index ab30a53..67122de 100644 --- a/src/utils/date_utils.py +++ b/src/utils/date_utils.py @@ -12,13 +12,39 @@ class Colors: NC = '\033[0m' # No Color def validate_date(date_str: str) -> None: - """Validate date format.""" + """Validate date format (supports YYYY-MM-DD or YYYY-MM-DDTHH).""" + # Try date format first try: datetime.strptime(date_str, "%Y-%m-%d") + return except ValueError: - print(f"{Colors.RED}Error: Invalid date format: {date_str}{Colors.NC}", file=sys.stderr) - print("Date must be in YYYY-MM-DD format", file=sys.stderr) - sys.exit(1) + pass + + # Try datetime format with hour + try: + datetime.strptime(date_str, "%Y-%m-%dT%H") + return + except ValueError: + pass + + print(f"{Colors.RED}Error: Invalid date format: {date_str}{Colors.NC}", file=sys.stderr) + print("Date must be in YYYY-MM-DD or YYYY-MM-DDTHH format (e.g., 2025-11-25 or 2025-11-25T00)", file=sys.stderr) + sys.exit(1) + +def parse_datetime(date_str: str) -> datetime: + """Parse date string to datetime object, supporting both date and datetime formats.""" + # Try datetime format with hour first + try: + return datetime.strptime(date_str, "%Y-%m-%dT%H") + except ValueError: + pass + + # Try date format + try: + return datetime.strptime(date_str, "%Y-%m-%d") + except ValueError: + raise ValueError(f"Invalid date format: {date_str}") + def parse_date_range(start_date: Optional[str] = None, end_date: Optional[str] = None, @@ -27,8 +53,8 @@ def parse_date_range(start_date: Optional[str] = None, Parse date range from arguments. Args: - start_date: Start date in YYYY-MM-DD format - end_date: End date in YYYY-MM-DD format + start_date: Start date in YYYY-MM-DD or YYYY-MM-DDTHH format + end_date: End date in YYYY-MM-DD or YYYY-MM-DDTHH format single_date: Single date (will sync from this date to today) Returns: @@ -37,11 +63,20 @@ def parse_date_range(start_date: Optional[str] = None, if single_date: start_date = single_date # Set end date to today in UTC + # If start_date has hour precision, end_date should also have hour precision + has_hour_precision = 'T' in single_date try: from datetime import timezone - end_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") + now = datetime.now(timezone.utc) except ImportError: - end_date = datetime.utcnow().strftime("%Y-%m-%d") + now = datetime.utcnow() + + if has_hour_precision: + # Use current hour for end date + end_date = now.strftime("%Y-%m-%dT%H") + else: + # Use just the date for end date + end_date = now.strftime("%Y-%m-%d") if not start_date or not end_date: raise ValueError("Start date and end date are required") @@ -50,9 +85,10 @@ def parse_date_range(start_date: Optional[str] = None, validate_date(start_date) validate_date(end_date) - # Validate date range - start_dt = datetime.strptime(start_date, "%Y-%m-%d") - end_dt = datetime.strptime(end_date, "%Y-%m-%d") + # Parse to datetime for comparison + start_dt = parse_datetime(start_date) + end_dt = parse_datetime(end_date) + if start_dt > end_dt: print(f"{Colors.RED}Error: Start date must be before or equal to end date{Colors.NC}", file=sys.stderr) sys.exit(1)