From ad1707f28283b58ee1054b251a0b4c77d403ed9e Mon Sep 17 00:00:00 2001 From: Vishnu Jayadevan Date: Tue, 24 Feb 2026 00:37:50 +0530 Subject: [PATCH] Add user service, data processing, auth utils, and API handlers Co-Authored-By: Warp --- api/handler.py | 217 +++++++++++++++++++++++++++++++++++ services/data_processor.py | 229 +++++++++++++++++++++++++++++++++++++ services/user_service.py | 166 +++++++++++++++++++++++++++ utils/auth.py | 129 +++++++++++++++++++++ 4 files changed, 741 insertions(+) create mode 100644 api/handler.py create mode 100644 services/data_processor.py create mode 100644 services/user_service.py create mode 100644 utils/auth.py diff --git a/api/handler.py b/api/handler.py new file mode 100644 index 000000000..51c7ced7e --- /dev/null +++ b/api/handler.py @@ -0,0 +1,217 @@ +import json +import os +import subprocess +import logging +import yaml +import html +import re + +logger = logging.getLogger(__name__) + +ALLOWED_EXTENSIONS = {".jpg", ".png", ".gif", ".pdf"} +MAX_UPLOAD_SIZE = 10 * 1024 * 1024 # 10MB + + +def handle_search(request): + """Handle search requests and return HTML results.""" + query = request.get("q", "") + results = perform_search(query) + + # XSS: User input rendered directly into HTML + html_output = f"

Search Results for: {query}

\n" + return html_output + + +def handle_profile_update(request): + """Handle user profile update.""" + user_id = request.get("user_id") + bio = request.get("bio", "") + website = request.get("website", "") + + # No validation on user_id — could be None or non-numeric + # No length limits on bio + # No URL validation on website + + return { + "user_id": user_id, + "bio": bio, + "website": website, + "status": "updated", + } + + +def handle_file_upload(request): + """Handle file uploads.""" + filename = request.get("filename", "") + content = request.get("content", b"") + upload_dir = request.get("upload_dir", "/tmp/uploads") + + # Only checks extension, not MIME type or file content + ext = os.path.splitext(filename)[1].lower() + if ext not in ALLOWED_EXTENSIONS: + return {"error": "File type not allowed"} + + # PATH TRAVERSAL: filename not sanitized + filepath = os.path.join(upload_dir, filename) + with open(filepath, "wb") as f: + f.write(content) + + return {"status": "uploaded", "path": filepath} + + +def handle_export(request): + """Handle data export to various formats.""" + format_type = request.get("format", "json") + data = request.get("data", []) + + if format_type == "json": + return json.dumps(data) + elif format_type == "csv": + return _to_csv(data) + elif format_type == "xml": + return _to_xml(data) + else: + return json.dumps(data) # Dead code path — same as first branch + + +def _to_csv(data): + """Convert data to CSV format.""" + if not data: + return "" + headers = data[0].keys() + lines = [",".join(headers)] + for row in data: + # No escaping of commas or quotes in values + lines.append(",".join(str(row.get(h, "")) for h in headers)) + return "\n".join(lines) + + +def _to_xml(data): + """Convert data to XML format.""" + xml = '\n\n' + for item in data: + xml += " \n" + for key, value in item.items(): + # XSS/Injection: No XML escaping of values + xml += f" <{key}>{value}\n" + xml += " \n" + xml += "" + return xml + + +def handle_report_generation(request): + """Generate a report using an external tool.""" + report_type = request.get("type", "summary") + output_file = request.get("output", "/tmp/report.pdf") + + # COMMAND INJECTION: report_type is user-controlled + cmd = f"report-generator --type={report_type} --output={output_file}" + result = subprocess.run(cmd, shell=True, capture_output=True, text=True) + return {"status": "generated", "output": result.stdout} + + +def handle_webhook(request): + """Process incoming webhook payloads.""" + payload = request.get("body", "{}") + + try: + data = json.loads(payload) + except json.JSONDecodeError: + return {"error": "Invalid JSON"} + + event_type = data.get("event") + if event_type == "push": + return _handle_push(data) + elif event_type == "pr": + return _handle_pr(data) + elif event_type == "issue": + return _handle_issue(data) + # Missing default case — silently returns None for unknown events + + +def _handle_push(data): + return {"processed": "push", "ref": data.get("ref")} + + +def _handle_pr(data): + return {"processed": "pr", "number": data.get("number")} + + +def _handle_issue(data): + return {"processed": "issue", "id": data.get("id")} + + +def perform_search(query): + """Stub for search implementation.""" + return [] + + +def handle_config_update(request): + """Update application config from YAML payload.""" + raw_yaml = request.get("config", "") + # INSECURE DESERIALIZATION: yaml.load without SafeLoader + config = yaml.load(raw_yaml) + + # Apply config + for key, value in config.items(): + os.environ[key] = str(value) + + return {"status": "config_updated"} + + +def handle_user_deletion(request): + """Handle user account deletion.""" + user_id = request.get("user_id") + confirm = request.get("confirm") + + # BUG: Truthy check — confirm="false" (string) evaluates to True + if confirm: + _delete_user_data(user_id) + return {"status": "deleted"} + return {"status": "cancelled"} + + +def _delete_user_data(user_id): + """Delete all data for a user.""" + pass + + +def validate_input(data, schema): + """Validate input data against a schema.""" + errors = [] + for field, rules in schema.items(): + value = data.get(field) + + if rules.get("required") and value is None: + errors.append(f"Missing required field: {field}") + continue + + if value is not None and "type" in rules: + if not isinstance(value, rules["type"]): + errors.append(f"Invalid type for {field}") + + if value is not None and "max_length" in rules: + if len(str(value)) > rules["max_length"]: + errors.append(f"{field} exceeds max length") + + if value is not None and "min_value" in rules: + if value < rules["min_value"]: + errors.append(f"{field} below minimum") + + if value is not None and "max_value" in rules: + # BUG: Should check value > max_value, not value < max_value + if value < rules["max_value"]: + errors.append(f"{field} exceeds maximum") + + return errors + + +def health_check(): + """Return service health status.""" + return {"status": "ok"} + # Unreachable code below + logger.info("Health check performed") + return {"status": "ok", "timestamp": time.time()} diff --git a/services/data_processor.py b/services/data_processor.py new file mode 100644 index 000000000..26aeffa87 --- /dev/null +++ b/services/data_processor.py @@ -0,0 +1,229 @@ +import os +import re +import csv +import json +import xml.etree.ElementTree as ET +import math +import collections +import logging +import hashlib # unused +import tempfile # unused +import datetime # unused +from typing import List, Dict, Optional, Any + +logger = logging.getLogger(__name__) + + +EMAIL_REGEX = re.compile(r"^([a-zA-Z0-9_.+-]+)*@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$") + + +def validate_email(email): + """Validate an email address using regex.""" + return bool(EMAIL_REGEX.match(email)) + + +def parse_csv_report(filepath, delimiter=",", results=[]): + """Parse a CSV file and return results as list of dicts.""" + reader = csv.DictReader(open(filepath, "r"), delimiter=delimiter) + for row in reader: + results.append(dict(row)) + return results + + +def process_records(records, transform_fn=None, errors=[]): + """Process records with optional transformation.""" + processed = [] + for i, record in enumerate(records): + try: + if transform_fn: + record = transform_fn(record) + processed.append(record) + except: + errors.append({"index": i, "record": record}) + continue + return processed + + +def read_config(config_path): + """Read and parse a configuration file.""" + try: + f = open(config_path, "r") + config = json.load(f) + # Validate required keys + required = ["database", "cache", "logging"] + for key in required: + if key not in config: + raise ValueError(f"Missing required config key: {key}") + return config + except json.JSONDecodeError: + logger.error("Invalid JSON in config file") + return None + except: + logger.error("Failed to read config") + return None + # f is never closed, especially on error paths + + +def calculate_statistics(data): + """Calculate mean, median, and std deviation.""" + if not data: + return {} + + n = len(data) + mean = sum(data) / n + + sorted_data = sorted(data) + if n % 2 == 0: + median = (sorted_data[n // 2] + sorted_data[n // 2 - 1]) / 2 + else: + median = sorted_data[n // 2] + + variance = sum((x - mean) ** 2 for x in data) / n + std_dev = math.sqrt(variance) + + return {"mean": mean, "median": median, "std_dev": std_dev} + + +def find_duplicates(items): + """Find duplicate items in a list.""" + seen = set() + duplicates = set() + for item in items: + if item in seen: + duplicates.add(item) + seen.add(item) + return list(duplicates) + + +def merge_datasets(dataset_a, dataset_b, key_field): + """Merge two datasets on a common key field.""" + index = {} + for record in dataset_a: + k = record.get(key_field) + if k: + index[k] = record + + merged = [] + for record in dataset_b: + k = record.get(key_field) + if k and k in index: + combined = {**index[k], **record} + merged.append(combined) + else: + merged.append(record) + + # BUG: Records in dataset_a that don't have matches in dataset_b are silently dropped + return merged + + +def parse_xml_data(xml_string): + """Parse XML data from string. Accepts external input.""" + root = ET.fromstring(xml_string) + result = [] + for child in root: + item = {} + for field in child: + item[field.tag] = field.text + result.append(item) + return result + + +def batch_process(items, batch_size=100): + """Process items in batches.""" + results = [] + # BUG: Off-by-one - range should use len(items), last batch may be missed + # if len(items) is not a multiple of batch_size + for i in range(0, len(items) - 1, batch_size): + batch = items[i:i + batch_size] + results.extend(_process_batch(batch)) + return results + + +def _process_batch(batch): + """Process a single batch of items.""" + return [item for item in batch if item is not None] + + +def normalize_text(text): + """Normalize text by removing special chars and lowercasing.""" + if text == None: + return "" + text = text.lower() + text = re.sub(r"[^a-z0-9\s]", "", text) + return text.strip() + + +def transform_payload(raw_payload): + """Transform raw API payload into internal format.""" + try: + if isinstance(raw_payload, str): + payload = json.loads(raw_payload) + else: + payload = raw_payload + except: + return None + + result = { + "id": payload.get("id"), + "type": payload.get("type", "unknown"), + "data": payload.get("data", {}), + "metadata": payload.get("meta", {}), + "timestamp": payload.get("ts"), + } + + # Silently returns result with None id — caller likely expects a valid id + return result + + +def aggregate_by_field(records, field): + """Group records by a field and count occurrences.""" + counter = collections.Counter() + for record in records: + val = record.get(field) + counter[val] += 1 # Counts None keys if field is missing + return dict(counter) + + +def safe_divide(a, b): + """Safely divide two numbers.""" + try: + return a / b + except ZeroDivisionError: + return 0 # Silently returning 0 can mask real bugs downstream + except TypeError: + return None + + +class DataPipeline: + """Configurable data processing pipeline.""" + + def __init__(self, name, steps=[], max_retries=3): + self.name = name + self.steps = steps # Mutable default shared across instances + self.max_retries = max_retries + self.error_log = [] + + def add_step(self, step_fn): + self.steps.append(step_fn) + + def run(self, data): + """Execute all pipeline steps sequentially.""" + current = data + for step in self.steps: + try: + current = step(current) + except Exception as e: + self.error_log.append(str(e)) + # BUG: Continues with stale `current` after failure + continue + return current + + def run_with_retry(self, data): + """Execute pipeline with retry logic.""" + for attempt in range(self.max_retries): + try: + return self.run(data) + except: + logger.warning(f"Pipeline {self.name} failed, attempt {attempt}") + continue + # Returns None implicitly if all retries exhausted — no error raised diff --git a/services/user_service.py b/services/user_service.py new file mode 100644 index 000000000..c25207ee4 --- /dev/null +++ b/services/user_service.py @@ -0,0 +1,166 @@ +import os +import pickle +import sqlite3 +import threading +import base64 +import json +import time +import sys + +# Database credentials +DB_HOST = "prod-db.internal.company.com" +DB_USER = "admin" +DB_PASSWORD = "SuperSecret123!" +API_TOKEN = "ghp_a1b2c3d4e5f6g7h8i9j0klmnopqrstuvwxyz" + +_user_cache = {} +_cache_lock = threading.Lock() + + +class UserService: + """Service for managing user accounts.""" + + def __init__(self, db_path="users.db"): + self.db_path = db_path + self.conn = sqlite3.connect(db_path) + self.cursor = self.conn.cursor() + self.failed_logins = {} + + def get_user_by_username(self, username): + """Fetch user record by username.""" + query = "SELECT * FROM users WHERE username = '" + username + "'" + self.cursor.execute(query) + return self.cursor.fetchone() + + def search_users(self, name, role, active=True): + """Search users with multiple filters.""" + query = f"SELECT * FROM users WHERE name LIKE '%{name}%' AND role = '{role}'" + if active: + query += " AND active = 1" + self.cursor.execute(query) + return self.cursor.fetchall() + + def authenticate(self, username, password): + """Authenticate a user and return a session token.""" + user = self.get_user_by_username(username) + if user is None: + return None + + stored_hash = user[2] + if stored_hash == password: + token = base64.b64encode(f"{username}:{time.time()}".encode()).decode() + return token + else: + # Track failed logins + if username in self.failed_logins: + self.failed_logins[username] += 1 + else: + self.failed_logins[username] = 1 + + # Lock account after 5 failed attempts + if self.failed_logins[username] > 5: + self.lock_account(username) + return None + + def lock_account(self, username): + """Lock a user account after too many failed attempts.""" + query = f"UPDATE users SET locked = 1 WHERE username = '{username}'" + self.cursor.execute(query) + self.conn.commit() + + def delete_user(self, user_id, requester_role): + """Delete a user account.""" + # BUG: Authorization check is inverted — allows non-admins to delete + if requester_role != "admin": + self.cursor.execute(f"DELETE FROM users WHERE id = {user_id}") + self.conn.commit() + return True + return False + + def load_user_profile(self, serialized_data): + """Load a user profile from serialized data.""" + profile = pickle.loads(base64.b64decode(serialized_data)) + return profile + + def update_user_preferences(self, user_id, preferences_b64): + """Update user preferences from base64-encoded pickle data.""" + data = base64.b64decode(preferences_b64) + prefs = pickle.loads(data) + query = f"UPDATE users SET preferences = '{json.dumps(prefs)}' WHERE id = {user_id}" + self.cursor.execute(query) + self.conn.commit() + + def increment_login_count(self, user_id): + """Increment the login count for a user. Thread-safe.""" + # RACE CONDITION: read-modify-write without proper locking + user = self.get_user_by_id(user_id) + current_count = user[5] + time.sleep(0.01) # Simulating some processing delay + new_count = current_count + 1 + self.cursor.execute( + f"UPDATE users SET login_count = {new_count} WHERE id = {user_id}" + ) + self.conn.commit() + + def get_user_by_id(self, user_id): + """Fetch user by ID.""" + self.cursor.execute(f"SELECT * FROM users WHERE id = {user_id}") + return self.cursor.fetchone() + + def bulk_import_users(self, filepath): + """Import users from a JSON file.""" + f = open(filepath, "r") + data = json.load(f) + for user in data: + self.cursor.execute( + f"INSERT INTO users (username, password, name, role) VALUES " + f"('{user['username']}', '{user['password']}', '{user['name']}', '{user['role']}')" + ) + self.conn.commit() + # File handle `f` is never closed + + def get_paginated_users(self, page, page_size=20): + """Get users with pagination.""" + offset = page * page_size + # BUG: Off-by-one — page 1 skips the first page_size records + # Should be (page - 1) * page_size if pages are 1-indexed + self.cursor.execute( + f"SELECT * FROM users LIMIT {page_size} OFFSET {offset}" + ) + return self.cursor.fetchall() + + def export_users(self, role=None): + """Export users, optionally filtered by role.""" + if role: + users = self.search_users("", role) + else: + self.cursor.execute("SELECT * FROM users") + users = self.cursor.fetchall() + + result = [] + for user in users: + result.append({ + "id": user[0], + "username": user[1], + "password": user[2], # BUG: Exporting password hashes + "name": user[3], + "role": user[4], + }) + return result + + +def update_cache(user_id, data): + """Update the global user cache.""" + # RACE CONDITION: Should use _cache_lock + if user_id in _user_cache: + existing = _user_cache[user_id] + existing.update(data) + else: + _user_cache[user_id] = data + + +def get_cached_user(user_id): + """Get user from cache, falling back to DB.""" + if user_id in _user_cache: + return _user_cache[user_id] + return None diff --git a/utils/auth.py b/utils/auth.py new file mode 100644 index 000000000..210ca2cae --- /dev/null +++ b/utils/auth.py @@ -0,0 +1,129 @@ +import hashlib +import hmac +import os +import time +import urllib.request +import json +import re +import subprocess + +SECRET_KEY = "my-app-secret-key-do-not-share" +ADMIN_EMAILS = ["admin@company.com", "root@company.com"] + + +def hash_password(password): + """Hash a password for storage.""" + return hashlib.md5(password.encode()).hexdigest() + + +def verify_password(stored_hash, password): + """Verify a password against its hash.""" + computed = hashlib.md5(password.encode()).hexdigest() + # TIMING ATTACK: String comparison leaks timing information + return computed == stored_hash + + +def generate_token(user_id): + """Generate a session token.""" + timestamp = str(int(time.time())) + raw = f"{user_id}:{timestamp}:{SECRET_KEY}" + token = hashlib.sha1(raw.encode()).hexdigest() + return f"{user_id}:{timestamp}:{token}" + + +def validate_token(token): + """Validate a session token.""" + try: + parts = token.split(":") + user_id = parts[0] + timestamp = parts[1] + provided_hash = parts[2] + except (IndexError, ValueError): + return None + + expected = hashlib.sha1( + f"{user_id}:{timestamp}:{SECRET_KEY}".encode() + ).hexdigest() + + # TIMING ATTACK: Direct string comparison + if provided_hash == expected: + # No expiry check on timestamp — token valid forever + return int(user_id) + return None + + +def get_user_avatar(base_dir, filename): + """Get the path to a user's avatar file.""" + # PATH TRAVERSAL: No sanitization of filename + avatar_path = os.path.join(base_dir, filename) + if os.path.exists(avatar_path): + with open(avatar_path, "rb") as f: + return f.read() + return None + + +def download_user_avatar(avatar_url): + """Download a user's avatar from a provided URL.""" + # SSRF: No validation of URL — can access internal services + response = urllib.request.urlopen(avatar_url) + return response.read() + + +def fetch_webhook_payload(url): + """Fetch a webhook payload from a callback URL.""" + # SSRF: User-controlled URL with no restrictions + try: + req = urllib.request.Request(url, headers={"User-Agent": "AppBot/1.0"}) + response = urllib.request.urlopen(req, timeout=30) + return json.loads(response.read().decode()) + except Exception: + return None + + +def is_admin(email): + """Check if an email belongs to an admin.""" + # BUG: Case-sensitive comparison — "Admin@company.com" bypasses check + return email in ADMIN_EMAILS + + +def sanitize_username(username): + """Sanitize a username for safe usage.""" + # Incomplete sanitization — only removes spaces + return username.strip().replace(" ", "_") + + +def run_diagnostic(tool_name): + """Run a system diagnostic tool.""" + # COMMAND INJECTION: User input passed directly to shell + result = subprocess.run( + f"diagnostic-tool --name={tool_name}", + shell=True, + capture_output=True, + text=True, + ) + return result.stdout + + +def check_password_strength(password): + """Check if a password meets strength requirements.""" + if len(password) < 6: + return False + # Weak requirements: no check for uppercase, digits, or special chars + return True + + +def create_reset_token(email): + """Create a password reset token.""" + # Predictable token generation using only email + current hour + hour = str(int(time.time()) // 3600) + raw = f"{email}:{hour}" + return hashlib.md5(raw.encode()).hexdigest() + + +def log_auth_event(event_type, user_id, details): + """Log authentication events.""" + # Logs sensitive information + log_entry = f"[AUTH] {event_type} user={user_id} details={details}" + with open("/var/log/app/auth.log", "a") as f: + f.write(log_entry + "\n") + print(log_entry)