-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsecurity_logging.py
More file actions
361 lines (283 loc) · 11.4 KB
/
security_logging.py
File metadata and controls
361 lines (283 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
"""Secure Logging Module - OWASP A02:2021 Cryptographic Failures Fix
This module provides secure, structured logging for the PISC application.
It ensures sensitive data is never logged while maintaining audit trails.
Security Principles (OWASP):
- Never log: API keys, full prompts, user credentials, PII
- Always log: Timestamps, log levels, event types, correlation IDs
- Use appropriate log levels: INFO (normal), WARNING (suspicious), ERROR (failures)
"""
import logging
import hashlib
import json
import uuid
from datetime import datetime, timezone
from functools import wraps
from typing import Any, Callable, Dict, Optional
from logging.handlers import RotatingFileHandler
import os
from pathlib import Path
# =============================================================================
# Sensitive Data Patterns - Never Log These
# =============================================================================
# Patterns for sensitive data that should never be logged
SENSITIVE_PATTERNS = [
"api_key",
"apikey",
"password",
"secret",
"token",
"authorization",
"credential",
"private_key",
"access_token",
"refresh_token",
]
def _contains_sensitive_key(key: str) -> bool:
"""Check if a key contains sensitive patterns."""
key_lower = key.lower()
return any(pattern in key_lower for pattern in SENSITIVE_PATTERNS)
def _hash_prompt(prompt: str, preview_length: int = 80) -> str:
"""Create a safe hash of a prompt for logging.
Args:
prompt: The full prompt text
preview_length: Maximum characters to include in preview
Returns:
A preview string (first N chars + hash) that is safe to log
"""
# Create truncated preview
preview = prompt[:preview_length]
if len(prompt) > preview_length:
preview += "..."
# Add hash for uniqueness identification
prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()[:8]
return f"{preview} [hash:{prompt_hash}]"
def sanitize_for_logging(data: Dict[str, Any]) -> Dict[str, Any]:
"""Sanitize a dictionary to remove sensitive data before logging.
Args:
data: Dictionary potentially containing sensitive data
Returns:
Sanitized dictionary safe for logging
"""
if not isinstance(data, dict):
return data
sanitized = {}
for key, value in data.items():
# Skip sensitive keys
if _contains_sensitive_key(key):
sanitized[key] = "[REDACTED]"
continue
# Recursively sanitize nested dictionaries
if isinstance(value, dict):
sanitized[key] = sanitize_for_logging(value)
elif isinstance(value, str):
# Truncate very long strings (potential prompts)
if len(value) > 200:
sanitized[key] = value[:200] + "...[truncated]"
else:
sanitized[key] = value
else:
sanitized[key] = value
return sanitized
# =============================================================================
# Secure Logger Configuration
# =============================================================================
class SecureJSONFormatter(logging.Formatter):
"""Custom formatter that sanitizes log records before output.
This formatter ensures no sensitive data is ever written to logs.
"""
def __init__(self, include_timestamp: bool = True):
super().__init__()
self.include_timestamp = include_timestamp
def format(self, record: logging.LogRecord) -> str:
"""Format log record as JSON with sanitization."""
# Build log entry
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Add correlation ID if present in record
if hasattr(record, "correlation_id"):
log_entry["correlation_id"] = record.correlation_id
# Add event type if present
if hasattr(record, "event_type"):
log_entry["event_type"] = record.event_type
# Add extra fields, sanitized
if hasattr(record, "extra_data"):
sanitized = sanitize_for_logging(record.extra_data)
log_entry["data"] = sanitized
# Add exception info if present
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)
class SecurityLogger:
"""Secure logger with built-in sanitization and correlation ID support."""
def __init__(self, name: str = "pisc", log_file: Optional[str] = None):
"""Initialize the secure logger.
Args:
name: Logger name
log_file: Optional file path for file logging
"""
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
# Prevent log propagation to parent logger (causes duplicate entries)
self.logger.propagate = False
# Critical fix: Prevent duplicate handlers (duplicate log entries)
# Check if handlers already exist and remove them before adding new ones
if len(self.logger.handlers) > 0:
self.logger.handlers.clear()
# Console handler with JSON formatter
console_handler = logging.StreamHandler()
console_handler.setFormatter(SecureJSONFormatter())
console_handler.setLevel(logging.INFO)
self.logger.addHandler(console_handler)
# File handler if specified
if log_file:
self._setup_file_handler(log_file)
def _setup_file_handler(self, log_file: str):
"""Set up rotating file handler."""
# Create log directory if it doesn't exist
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
# Rotating file handler - max 10MB, keep 5 files
file_handler = RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
)
file_handler.setFormatter(SecureJSONFormatter())
file_handler.setLevel(logging.DEBUG)
self.logger.addHandler(file_handler)
def _log(self, level: int, message: str, event_type: str, **kwargs):
"""Internal log method with correlation ID support."""
# Create correlation ID if not provided
correlation_id = kwargs.pop("correlation_id", str(uuid.uuid4())[:8])
# Sanitize extra data
extra_data = kwargs.pop("extra_data", None)
if extra_data:
extra_data = sanitize_for_logging(extra_data)
# Create log record
extra = {
"correlation_id": correlation_id,
"event_type": event_type,
}
if extra_data:
extra["extra_data"] = extra_data
self.logger.log(level, message, extra=extra, stacklevel=3)
def info(self, message: str, event_type: str = "info", **kwargs):
"""Log info level message.
Args:
message: Log message
event_type: Type of event for categorization
**kwargs: Additional context (correlation_id, extra_data, etc.)
"""
self._log(logging.INFO, message, event_type, **kwargs)
def warning(self, message: str, event_type: str = "warning", **kwargs):
"""Log warning level message for suspicious activity.
Args:
message: Log message
event_type: Type of event for categorization
**kwargs: Additional context
"""
self._log(logging.WARNING, message, event_type, **kwargs)
def error(self, message: str, event_type: str = "error", **kwargs):
"""Log error level message for failures.
Args:
message: Log message
event_type: Type of event for categorization
**kwargs: Additional context
"""
self._log(logging.ERROR, message, event_type, **kwargs)
def debug(self, message: str, event_type: str = "debug", **kwargs):
"""Log debug level message for development.
Args:
message: Log message
event_type: Type of event for categorization
**kwargs: Additional context
"""
self._log(logging.DEBUG, message, event_type, **kwargs)
# =============================================================================
# Scan-Specific Logging Helpers
# =============================================================================
def log_scan_event(logger: SecurityLogger, stage: str, status: str, **data):
"""Log a scan pipeline event with appropriate level.
Args:
logger: The secure logger instance
stage: Scan stage (regex_scan, risk_scoring, llm_classification, etc.)
status: Status (started, completed, skipped, error)
**data: Additional event data (sanitized automatically)
"""
# Determine log level based on status
if status == "error":
level = logger.error
event_type = f"scan_{stage}_error"
elif status == "started":
level = logger.info
event_type = f"scan_{stage}_started"
elif status in ("completed", "skipped"):
level = logger.info
event_type = f"scan_{stage}_{status}"
else:
level = logger.warning
event_type = f"scan_{stage}_unknown"
# Always include stage and status, sanitize other data
extra_data = {
"stage": stage,
"status": status,
**data,
}
level(
f"Scan {stage}: {status}",
event_type=event_type,
extra_data=extra_data,
)
def log_security_event(logger: SecurityLogger, event_type: str, severity: str, **data):
"""Log a security-related event.
Args:
logger: The secure logger instance
event_type: Type of security event
severity: Severity level (low, medium, high, critical)
**data: Additional event data
"""
level_map = {
"low": logger.info,
"medium": logger.warning,
"high": logger.warning,
"critical": logger.error,
}
level = level_map.get(severity.lower(), logger.info)
level(
f"Security event: {event_type}",
event_type=f"security_{event_type}",
extra_data={"severity": severity, **data},
)
# =============================================================================
# Correlation ID Context Manager
# =============================================================================
class CorrelationContext:
"""Context manager for tracking correlation IDs across operations."""
_context_var: Optional[str] = None
@classmethod
def get_id(cls) -> str:
"""Get current correlation ID or create new one."""
if cls._context_var is None:
cls._context_var = str(uuid.uuid4())[:8]
return cls._context_var
@classmethod
def set_id(cls, correlation_id: str):
"""Set a specific correlation ID."""
cls._context_var = correlation_id
@classmethod
def clear_id(cls):
"""Clear the correlation ID."""
cls._context_var = None
# =============================================================================
# Default Logger Instance
# =============================================================================
# Default secure logger instance
# Can be imported and used throughout the application
default_logger = SecurityLogger("pisc")