From 49cac23174d2a3552d5f9a66f96ca0d560ad90a7 Mon Sep 17 00:00:00 2001 From: ReenigneArcher <42013603+ReenigneArcher@users.noreply.github.com> Date: Sun, 15 Feb 2026 11:01:40 -0500 Subject: [PATCH 1/2] fix(migrations): add migration lock and batch DB updates Introduce a global MIGRATION_LOCK to prevent concurrent migrations and refactor Reddit/Mee6 import flows into internal methods. Migrations now disable database.GIT_ENABLED and restore its original state on completion or error. Reddit migration applies updates in a single transactional context by building an existing-users map and performing inserts/updates with progress logging. Mee6 migration collects updates asynchronously and applies them in batched transactions, with improved error handling and final DB sync. --- src/common/rank.py | 206 ++++++++++++++++++++++++++++++++------------- 1 file changed, 147 insertions(+), 59 deletions(-) diff --git a/src/common/rank.py b/src/common/rank.py index 1f697b0..fa81192 100644 --- a/src/common/rank.py +++ b/src/common/rank.py @@ -2,6 +2,7 @@ from datetime import datetime, UTC import math import random +import threading import time from typing import Dict, List, Optional, Tuple, Union @@ -16,6 +17,9 @@ from src.common.rank_database import RankDatabase from src.discord_bot.bot import Bot +# Global migration lock to prevent concurrent migrations +MIGRATION_LOCK = threading.Lock() + class RankSystem: """ @@ -453,6 +457,17 @@ def migrate_from_reddit_database( Dict[str, Union[int, str]] Migration statistics """ + # Acquire migration lock to prevent concurrent migrations + with MIGRATION_LOCK: + return self._do_reddit_migration(reddit_bot, reddit_db, community_id) + + def _do_reddit_migration( + self, + reddit_bot, + reddit_db, + community_id: str, + ) -> Dict[str, Union[int, str]]: + """Internal method that performs the actual Reddit migration.""" total_users = 0 new_users = 0 updated_users = 0 @@ -462,6 +477,7 @@ def migrate_from_reddit_database( skipped_comments = 0 user_xp_map = {} # Maps user_id to accumulated XP + original_git_enabled = database.GIT_ENABLED database.GIT_ENABLED = False # Disable Git for this operation print("Starting Reddit ranks migration") @@ -550,46 +566,64 @@ def migrate_from_reddit_database( total_users = len(user_xp_map) print(f"Updating {total_users} users in rank database") - for user_id, stats in user_xp_map.items(): - database.GIT_ENABLED = False # set this on every iteration in case it was enabled somewhere else - - # Get existing user data or create new - user_data = self.db.get_user_data( - platform='reddit', - community_id=community_id, - user_id=user_id, - create_if_not_exists=True, - ) - - if user_data.get('xp', 0) > 0: - # User already has XP - updated_users += 1 - else: - # New user or user with no XP - new_users += 1 - - # Update user with imported data - user_data['xp'] = stats['xp'] - user_data['message_count'] = stats['submissions'] + stats['comments'] - user_data['submission_count'] = stats['submissions'] - user_data['comment_count'] = stats['comments'] - user_data['username'] = stats['name'] - user_data['reddit_import_date'] = datetime.now(UTC).isoformat() - - self.db.update_user_data( - platform='reddit', - community_id=community_id, - user_id=user_id, - data=user_data, - ) + # Apply all updates in one transaction + with self.db as db: + table = db.table('reddit_users') + + # Build a lookup map of existing users for faster access + existing_users_map = {} + for item in table.all(): + key = (item.get('user_id'), item.get('community_id')) + existing_users_map[key] = item + print(f"Found {len(existing_users_map)} existing users") + + # Process each user + processed = 0 + for user_id, stats in user_xp_map.items(): + key = (user_id, community_id) + existing = existing_users_map.get(key) + + if existing and existing.get('xp', 0) > 0: + # User already has XP + updated_users += 1 + else: + # New user or user with no XP + new_users += 1 + + # Prepare user data + user_data = { + 'user_id': user_id, + 'community_id': community_id, + 'xp': stats['xp'], + 'message_count': stats['submissions'] + stats['comments'], + 'submission_count': stats['submissions'], + 'comment_count': stats['comments'], + 'username': stats['name'], + 'reddit_import_date': datetime.now(UTC).isoformat(), + } + + # Update or insert + if existing: + table.update(user_data, doc_ids=[existing.doc_id]) + else: + table.insert(user_data) + + processed += 1 + # Print progress every 50 users + if processed % 50 == 0: + print(f"Progress: {processed}/{total_users} users processed") + + print(f"Finished updating {total_users} users") except Exception as e: print(f"Error during Reddit migration: {type(e).__name__}: {e}") - # Re-raise after enabling Git - database.GIT_ENABLED = True + # Re-raise after restoring Git state + database.GIT_ENABLED = original_git_enabled raise - database.GIT_ENABLED = True # Re-enable Git after operation + # Restore original Git state and force one final sync + database.GIT_ENABLED = original_git_enabled + self.db.sync() stats = { 'total_users': total_users, @@ -621,11 +655,28 @@ async def migrate_from_mee6(self, guild_id: int) -> Dict[str, Union[int, str]]: Dict[str, Union[int, str]] Migration statistics """ + import asyncio + + # Use async-friendly locking + loop = asyncio.get_event_loop() + + # Acquire lock in a thread-safe way for async + await loop.run_in_executor(None, MIGRATION_LOCK.acquire) + + try: + return await self._do_mee6_migration(guild_id) + finally: + MIGRATION_LOCK.release() + + async def _do_mee6_migration(self, guild_id: int) -> Dict[str, Union[int, str]]: + """Internal method that performs the actual Mee6 migration.""" page = 0 total_users = 0 new_users = 0 updated_users = 0 + batch_updates = [] # Collect all updates before applying + original_git_enabled = database.GIT_ENABLED database.GIT_ENABLED = False # Disable Git for this operation async with aiohttp.ClientSession() as session: @@ -650,43 +701,80 @@ async def migrate_from_mee6(self, guild_id: int) -> Dict[str, Union[int, str]]: print(f"Processing {player_count} players from page {page}") for player in data['players']: - database.GIT_ENABLED = False # set this on every loop in case it's updated in another place - total_users += 1 user_id = int(player['id']) - # Get existing user data or create new - user_data = self.db.get_user_data( - platform='discord', - community_id=guild_id, - user_id=user_id, - create_if_not_exists=True, - ) - - if user_data.get('xp', 0) > 0: - # User already has XP, skip or update as needed - updated_users += 1 - continue - - # Update user with imported data - user_data['xp'] = player['xp'] - user_data['message_count'] = player.get('message_count', 0) - user_data['username'] = player.get('username', f"User {user_id}") - user_data['mee6_import_date'] = datetime.now(UTC).isoformat() - - self.db.update_user_data('discord', guild_id, user_id, user_data) - new_users += 1 + # Collect update data without writing yet + user_update = { + 'user_id': user_id, + 'xp': player['xp'], + 'message_count': player.get('message_count', 0), + 'username': player.get('username', f"User {user_id}"), + 'mee6_import_date': datetime.now(UTC).isoformat(), + } + batch_updates.append(user_update) except aiohttp.ClientError as e: print(f"HTTP error during migration: {e}") break except Exception as e: print(f"Unexpected error during migration: {e}") + import traceback + traceback.print_exc() break page += 1 - database.GIT_ENABLED = True # Re-enable Git after operation + # Now apply all updates in batches to avoid multiple commits + print(f"Applying {len(batch_updates)} user updates in batches") + BATCH_SIZE = 100 + + for i in range(0, len(batch_updates), BATCH_SIZE): + batch = batch_updates[i:i+BATCH_SIZE] + + # Process batch within a single database context + with self.db as db: + table = db.table('discord_users') + + # Build a lookup map of existing users in this batch + existing_users_map = {} + for item in table.all(): + if item.get('community_id') == guild_id: + existing_users_map[item.get('user_id')] = item + + for user_update in batch: + user_id = user_update['user_id'] + existing = existing_users_map.get(user_id) + + if existing and existing.get('xp', 0) > 0: + # User already has XP + updated_users += 1 + else: + # New user + new_users += 1 + + # Prepare user data + user_data = { + 'user_id': user_id, + 'community_id': guild_id, + 'xp': user_update['xp'], + 'message_count': user_update['message_count'], + 'username': user_update['username'], + 'mee6_import_date': user_update['mee6_import_date'], + } + + # Update or insert + if existing: + table.update(user_data, doc_ids=[existing.doc_id]) + else: + table.insert(user_data) + + print(f"Completed batch {i//BATCH_SIZE + 1}/{(len(batch_updates)-1)//BATCH_SIZE + 1}") + + # Restore original Git state and force one final sync + database.GIT_ENABLED = original_git_enabled + self.db.sync() + stats = { 'total_processed': total_users, From c90533d43af0117f9a3aaa3e76e95cbcad1433f2 Mon Sep 17 00:00:00 2001 From: ReenigneArcher <42013603+ReenigneArcher@users.noreply.github.com> Date: Sun, 15 Feb 2026 11:11:46 -0500 Subject: [PATCH 2/2] Refactor Reddit and Mee6 migration logic Extract reusable helpers and simplify migration flows for Reddit and Mee6 imports. Added _process_reddit_item and _update_reddit_rank_database to centralize author handling, XP aggregation, DB upsert logic and progress logging; this reduces duplication and improves error handling. For Mee6, added _fetch_mee6_page (with timeout/error handling) and _process_mee6_batch, and changed the migration to fetch pages and apply updates in batches, reducing DB churn. Also updated final stats to use derived counts and restored original Git state before final sync. Overall aims: clearer structure, fewer nested try/except blocks, better logging, and more efficient DB writes. --- src/common/rank.py | 470 ++++++++++++++++++++++++++------------------- 1 file changed, 277 insertions(+), 193 deletions(-) diff --git a/src/common/rank.py b/src/common/rank.py index fa81192..e2b505b 100644 --- a/src/common/rank.py +++ b/src/common/rank.py @@ -461,6 +461,136 @@ def migrate_from_reddit_database( with MIGRATION_LOCK: return self._do_reddit_migration(reddit_bot, reddit_db, community_id) + def _process_reddit_item( + self, + reddit_bot, + item: dict, + item_type: str, + user_xp_map: dict, + ) -> Tuple[bool, bool]: + """ + Process a single Reddit submission or comment. + + Parameters + ---------- + reddit_bot + Reddit bot instance + item : dict + Submission or comment data + item_type : str + 'submission' or 'comment' + user_xp_map : dict + Map of user_id to XP stats + + Returns + ------- + Tuple[bool, bool] + (success, skipped) - True if processed successfully, True if skipped + """ + author_name = item.get('author') + if not author_name or author_name == "[deleted]": + return False, True + + try: + author = reddit_bot.fetch_user(name=author_name) + except Exception as e: + print(f"Error fetching author '{author_name}' for {item_type}: {type(e).__name__}: {e}") + return False, True + + # Skip items without valid author + if not author or not hasattr(author, 'id'): + print(f"Invalid author object for '{author_name}', skipping {item_type}") + return False, True + + # Award random XP in range + xp_gain = random.randint(150, 250) + + if author.id not in user_xp_map: + user_xp_map[author.id] = {'xp': 0, 'submissions': 0, 'comments': 0, 'name': author.name} + + user_xp_map[author.id]['xp'] += xp_gain + if item_type == 'submission': + user_xp_map[author.id]['submissions'] += 1 + else: + user_xp_map[author.id]['comments'] += 1 + + return True, False + + def _update_reddit_rank_database( + self, + community_id: str, + user_xp_map: dict, + ) -> Tuple[int, int]: + """ + Update the rank database with accumulated XP. + + Parameters + ---------- + community_id : str + Subreddit ID + user_xp_map : dict + Map of user_id to XP stats + + Returns + ------- + Tuple[int, int] + (new_users, updated_users) + """ + new_users = 0 + updated_users = 0 + total_users = len(user_xp_map) + + print(f"Updating {total_users} users in rank database") + + with self.db as db: + table = db.table('reddit_users') + + # Build a lookup map of existing users for faster access + print("Building lookup map of existing users...") + existing_users_map = {} + for item in table.all(): + key = (item.get('user_id'), item.get('community_id')) + existing_users_map[key] = item + print(f"Found {len(existing_users_map)} existing users") + + # Process each user + processed = 0 + for user_id, stats in user_xp_map.items(): + key = (user_id, community_id) + existing = existing_users_map.get(key) + + if existing and existing.get('xp', 0) > 0: + updated_users += 1 + else: + new_users += 1 + + # Prepare user data + user_data = { + 'user_id': user_id, + 'community_id': community_id, + 'xp': stats['xp'], + 'message_count': stats['submissions'] + stats['comments'], + 'submission_count': stats['submissions'], + 'comment_count': stats['comments'], + 'username': stats['name'], + 'reddit_import_date': datetime.now(UTC).isoformat(), + } + + # Update or insert + if existing: + table.update(user_data, doc_ids=[existing.doc_id]) + else: + table.insert(user_data) + + processed += 1 + # Print progress every 50 users + if processed % 50 == 0: + print(f"Progress: {processed}/{total_users} users processed") + + print(f"Finished updating {total_users} users") + + return new_users, updated_users + def _do_reddit_migration( self, reddit_bot, @@ -468,9 +598,6 @@ def _do_reddit_migration( community_id: str, ) -> Dict[str, Union[int, str]]: """Internal method that performs the actual Reddit migration.""" - total_users = 0 - new_users = 0 - updated_users = 0 total_submissions = 0 total_comments = 0 skipped_submissions = 0 @@ -489,135 +616,42 @@ def _do_reddit_migration( print(f"Processing {len(submissions_table.all())} submissions") for submission in submissions_table.all(): - author_name = submission.get('author') - if not author_name or author_name == "[deleted]": - skipped_submissions += 1 - continue - try: - try: - author = reddit_bot.fetch_user(name=author_name) - except Exception as e: - print(f"Error fetching author '{author_name}' for submission: {type(e).__name__}: {e}") - skipped_submissions += 1 - continue - - # Skip submissions without valid author - if not author or not hasattr(author, 'id'): - print(f"Invalid author object for '{author_name}', skipping submission") + success, skipped = self._process_reddit_item( + reddit_bot, submission, 'submission', user_xp_map + ) + if success: + total_submissions += 1 + elif skipped: skipped_submissions += 1 - continue - - total_submissions += 1 - - # Award random XP in range - xp_gain = random.randint(150, 250) - - if author.id not in user_xp_map: - user_xp_map[author.id] = {'xp': 0, 'submissions': 0, 'comments': 0, 'name': author.name} - - user_xp_map[author.id]['xp'] += xp_gain - user_xp_map[author.id]['submissions'] += 1 except Exception as e: + author_name = submission.get('author', 'unknown') print(f"Unexpected error processing submission by '{author_name}': {type(e).__name__}: {e}") skipped_submissions += 1 - continue # Process comments comments_table = db.table('comments') print(f"Processing {len(comments_table.all())} comments") for comment in comments_table.all(): - author_name = comment.get('author') - if not author_name or author_name == "[deleted]": - skipped_comments += 1 - continue - try: - try: - author = reddit_bot.fetch_user(name=author_name) - except Exception as e: - print(f"Error fetching author '{author_name}' for comment: {type(e).__name__}: {e}") + success, skipped = self._process_reddit_item( + reddit_bot, comment, 'comment', user_xp_map + ) + if success: + total_comments += 1 + elif skipped: skipped_comments += 1 - continue - - # Skip comments without valid author - if not author or not hasattr(author, 'id'): - print(f"Invalid author object for '{author_name}', skipping comment") - skipped_comments += 1 - continue - - total_comments += 1 - - # Award random XP in range - xp_gain = random.randint(150, 250) - - if author.id not in user_xp_map: - user_xp_map[author.id] = {'xp': 0, 'submissions': 0, 'comments': 0, 'name': author.name} - - user_xp_map[author.id]['xp'] += xp_gain - user_xp_map[author.id]['comments'] += 1 except Exception as e: + author_name = comment.get('author', 'unknown') print(f"Unexpected error processing comment by '{author_name}': {type(e).__name__}: {e}") skipped_comments += 1 - continue - - # Now update the rank database with accumulated XP - total_users = len(user_xp_map) - print(f"Updating {total_users} users in rank database") - - # Apply all updates in one transaction - with self.db as db: - table = db.table('reddit_users') - - # Build a lookup map of existing users for faster access - existing_users_map = {} - for item in table.all(): - key = (item.get('user_id'), item.get('community_id')) - existing_users_map[key] = item - print(f"Found {len(existing_users_map)} existing users") - - # Process each user - processed = 0 - for user_id, stats in user_xp_map.items(): - key = (user_id, community_id) - existing = existing_users_map.get(key) - - if existing and existing.get('xp', 0) > 0: - # User already has XP - updated_users += 1 - else: - # New user or user with no XP - new_users += 1 - - # Prepare user data - user_data = { - 'user_id': user_id, - 'community_id': community_id, - 'xp': stats['xp'], - 'message_count': stats['submissions'] + stats['comments'], - 'submission_count': stats['submissions'], - 'comment_count': stats['comments'], - 'username': stats['name'], - 'reddit_import_date': datetime.now(UTC).isoformat(), - } - - # Update or insert - if existing: - table.update(user_data, doc_ids=[existing.doc_id]) - else: - table.insert(user_data) - - processed += 1 - # Print progress every 50 users - if processed % 50 == 0: - print(f"Progress: {processed}/{total_users} users processed") - print(f"Finished updating {total_users} users") + # Update the rank database + new_users, updated_users = self._update_reddit_rank_database(community_id, user_xp_map) except Exception as e: print(f"Error during Reddit migration: {type(e).__name__}: {e}") - # Re-raise after restoring Git state database.GIT_ENABLED = original_git_enabled raise @@ -626,7 +660,7 @@ def _do_reddit_migration( self.db.sync() stats = { - 'total_users': total_users, + 'total_users': len(user_xp_map), 'new_users': new_users, 'updated_users': updated_users, 'total_submissions': total_submissions, @@ -668,6 +702,116 @@ async def migrate_from_mee6(self, guild_id: int) -> Dict[str, Union[int, str]]: finally: MIGRATION_LOCK.release() + async def _fetch_mee6_page( + self, + session: aiohttp.ClientSession, + guild_id: int, + page: int, + ) -> Optional[list]: + """ + Fetch a single page of Mee6 data. + + Parameters + ---------- + session : aiohttp.ClientSession + HTTP session + guild_id : int + Discord guild ID + page : int + Page number to fetch + + Returns + ------- + Optional[list] + List of player data, or None if fetch failed + """ + url = f"https://mee6.xyz/api/plugins/levels/leaderboard/{guild_id}?page={page}" + print(f"Fetching Mee6 data from: {url}") + + try: + async with session.get(url, timeout=10) as response: + if response.status != 200: + print(f"Received status code {response.status}, stopping migration") + return None + + data = await response.json() + + if not data.get('players') or len(data['players']) == 0: + print("No more players found, stopping migration") + return None + + player_count = len(data['players']) + print(f"Processing {player_count} players from page {page}") + return data['players'] + + except aiohttp.ClientError as e: + print(f"HTTP error during migration: {e}") + return None + except Exception as e: + print(f"Unexpected error during migration: {e}") + import traceback + traceback.print_exc() + return None + + def _process_mee6_batch( + self, + guild_id: int, + batch: list, + ) -> Tuple[int, int]: + """ + Process a batch of Mee6 user updates. + + Parameters + ---------- + guild_id : int + Discord guild ID + batch : list + List of user update dictionaries + + Returns + ------- + Tuple[int, int] + (new_users, updated_users) + """ + new_users = 0 + updated_users = 0 + + with self.db as db: + table = db.table('discord_users') + + # Build a lookup map of existing users in this batch + existing_users_map = {} + for item in table.all(): + if item.get('community_id') == guild_id: + existing_users_map[item.get('user_id')] = item + + for user_update in batch: + user_id = user_update['user_id'] + existing = existing_users_map.get(user_id) + + if existing and existing.get('xp', 0) > 0: + updated_users += 1 + else: + new_users += 1 + + # Prepare user data + user_data = { + 'user_id': user_id, + 'community_id': guild_id, + 'xp': user_update['xp'], + 'message_count': user_update['message_count'], + 'username': user_update['username'], + 'mee6_import_date': user_update['mee6_import_date'], + } + + # Update or insert + if existing: + table.update(user_data, doc_ids=[existing.doc_id]) + else: + table.insert(user_data) + + return new_users, updated_users + async def _do_mee6_migration(self, guild_id: int) -> Dict[str, Union[int, str]]: """Internal method that performs the actual Mee6 migration.""" page = 0 @@ -681,101 +825,41 @@ async def _do_mee6_migration(self, guild_id: int) -> Dict[str, Union[int, str]]: async with aiohttp.ClientSession() as session: while True: - url = f"https://mee6.xyz/api/plugins/levels/leaderboard/{guild_id}?page={page}" - print(f"Fetching Mee6 data from: {url}") - - try: - # Add timeout to prevent hanging - async with session.get(url, timeout=10) as response: - if response.status != 200: - print(f"Received status code {response.status}, stopping migration") - break - - data = await response.json() - - if not data.get('players') or len(data['players']) == 0: - print("No more players found, stopping migration") - break - - player_count = len(data['players']) - print(f"Processing {player_count} players from page {page}") - - for player in data['players']: - total_users += 1 - user_id = int(player['id']) - - # Collect update data without writing yet - user_update = { - 'user_id': user_id, - 'xp': player['xp'], - 'message_count': player.get('message_count', 0), - 'username': player.get('username', f"User {user_id}"), - 'mee6_import_date': datetime.now(UTC).isoformat(), - } - batch_updates.append(user_update) - - except aiohttp.ClientError as e: - print(f"HTTP error during migration: {e}") - break - except Exception as e: - print(f"Unexpected error during migration: {e}") - import traceback - traceback.print_exc() + players = await self._fetch_mee6_page(session, guild_id, page) + if players is None: break + for player in players: + total_users += 1 + user_id = int(player['id']) + + # Collect update data without writing yet + user_update = { + 'user_id': user_id, + 'xp': player['xp'], + 'message_count': player.get('message_count', 0), + 'username': player.get('username', f"User {user_id}"), + 'mee6_import_date': datetime.now(UTC).isoformat(), + } + batch_updates.append(user_update) + page += 1 - # Now apply all updates in batches to avoid multiple commits + # Now apply all updates in batches print(f"Applying {len(batch_updates)} user updates in batches") BATCH_SIZE = 100 for i in range(0, len(batch_updates), BATCH_SIZE): batch = batch_updates[i:i+BATCH_SIZE] - - # Process batch within a single database context - with self.db as db: - table = db.table('discord_users') - - # Build a lookup map of existing users in this batch - existing_users_map = {} - for item in table.all(): - if item.get('community_id') == guild_id: - existing_users_map[item.get('user_id')] = item - - for user_update in batch: - user_id = user_update['user_id'] - existing = existing_users_map.get(user_id) - - if existing and existing.get('xp', 0) > 0: - # User already has XP - updated_users += 1 - else: - # New user - new_users += 1 - - # Prepare user data - user_data = { - 'user_id': user_id, - 'community_id': guild_id, - 'xp': user_update['xp'], - 'message_count': user_update['message_count'], - 'username': user_update['username'], - 'mee6_import_date': user_update['mee6_import_date'], - } - - # Update or insert - if existing: - table.update(user_data, doc_ids=[existing.doc_id]) - else: - table.insert(user_data) - + batch_new, batch_updated = self._process_mee6_batch(guild_id, batch) + new_users += batch_new + updated_users += batch_updated print(f"Completed batch {i//BATCH_SIZE + 1}/{(len(batch_updates)-1)//BATCH_SIZE + 1}") # Restore original Git state and force one final sync database.GIT_ENABLED = original_git_enabled self.db.sync() - stats = { 'total_processed': total_users, 'new_users': new_users,