diff --git a/src/common/rank.py b/src/common/rank.py index 1f697b0..e2b505b 100644 --- a/src/common/rank.py +++ b/src/common/rank.py @@ -2,6 +2,7 @@ from datetime import datetime, UTC import math import random +import threading import time from typing import Dict, List, Optional, Tuple, Union @@ -16,6 +17,9 @@ from src.common.rank_database import RankDatabase from src.discord_bot.bot import Bot +# Global migration lock to prevent concurrent migrations +MIGRATION_LOCK = threading.Lock() + class RankSystem: """ @@ -453,15 +457,154 @@ def migrate_from_reddit_database( Dict[str, Union[int, str]] Migration statistics """ - total_users = 0 + # Acquire migration lock to prevent concurrent migrations + with MIGRATION_LOCK: + return self._do_reddit_migration(reddit_bot, reddit_db, community_id) + + def _process_reddit_item( + self, + reddit_bot, + item: dict, + item_type: str, + user_xp_map: dict, + ) -> Tuple[bool, bool]: + """ + Process a single Reddit submission or comment. + + Parameters + ---------- + reddit_bot + Reddit bot instance + item : dict + Submission or comment data + item_type : str + 'submission' or 'comment' + user_xp_map : dict + Map of user_id to XP stats + + Returns + ------- + Tuple[bool, bool] + (success, skipped) - True if processed successfully, True if skipped + """ + author_name = item.get('author') + if not author_name or author_name == "[deleted]": + return False, True + + try: + author = reddit_bot.fetch_user(name=author_name) + except Exception as e: + print(f"Error fetching author '{author_name}' for {item_type}: {type(e).__name__}: {e}") + return False, True + + # Skip items without valid author + if not author or not hasattr(author, 'id'): + print(f"Invalid author object for '{author_name}', skipping {item_type}") + return False, True + + # Award random XP in range + xp_gain = random.randint(150, 250) + + if author.id not in user_xp_map: + user_xp_map[author.id] = {'xp': 0, 'submissions': 0, 'comments': 0, 'name': author.name} + + user_xp_map[author.id]['xp'] += xp_gain + if item_type == 'submission': + user_xp_map[author.id]['submissions'] += 1 + else: + user_xp_map[author.id]['comments'] += 1 + + return True, False + + def _update_reddit_rank_database( + self, + community_id: str, + user_xp_map: dict, + ) -> Tuple[int, int]: + """ + Update the rank database with accumulated XP. + + Parameters + ---------- + community_id : str + Subreddit ID + user_xp_map : dict + Map of user_id to XP stats + + Returns + ------- + Tuple[int, int] + (new_users, updated_users) + """ new_users = 0 updated_users = 0 + total_users = len(user_xp_map) + + print(f"Updating {total_users} users in rank database") + + with self.db as db: + table = db.table('reddit_users') + + # Build a lookup map of existing users for faster access + print("Building lookup map of existing users...") + existing_users_map = {} + for item in table.all(): + key = (item.get('user_id'), item.get('community_id')) + existing_users_map[key] = item + print(f"Found {len(existing_users_map)} existing users") + + # Process each user + processed = 0 + for user_id, stats in user_xp_map.items(): + key = (user_id, community_id) + existing = existing_users_map.get(key) + + if existing and existing.get('xp', 0) > 0: + updated_users += 1 + else: + new_users += 1 + + # Prepare user data + user_data = { + 'user_id': user_id, + 'community_id': community_id, + 'xp': stats['xp'], + 'message_count': stats['submissions'] + stats['comments'], + 'submission_count': stats['submissions'], + 'comment_count': stats['comments'], + 'username': stats['name'], + 'reddit_import_date': datetime.now(UTC).isoformat(), + } + + # Update or insert + if existing: + table.update(user_data, doc_ids=[existing.doc_id]) + else: + table.insert(user_data) + + processed += 1 + # Print progress every 50 users + if processed % 50 == 0: + print(f"Progress: {processed}/{total_users} users processed") + + print(f"Finished updating {total_users} users") + + return new_users, updated_users + + def _do_reddit_migration( + self, + reddit_bot, + reddit_db, + community_id: str, + ) -> Dict[str, Union[int, str]]: + """Internal method that performs the actual Reddit migration.""" total_submissions = 0 total_comments = 0 skipped_submissions = 0 skipped_comments = 0 user_xp_map = {} # Maps user_id to accumulated XP + original_git_enabled = database.GIT_ENABLED database.GIT_ENABLED = False # Disable Git for this operation print("Starting Reddit ranks migration") @@ -473,126 +616,51 @@ def migrate_from_reddit_database( print(f"Processing {len(submissions_table.all())} submissions") for submission in submissions_table.all(): - author_name = submission.get('author') - if not author_name or author_name == "[deleted]": - skipped_submissions += 1 - continue - try: - try: - author = reddit_bot.fetch_user(name=author_name) - except Exception as e: - print(f"Error fetching author '{author_name}' for submission: {type(e).__name__}: {e}") + success, skipped = self._process_reddit_item( + reddit_bot, submission, 'submission', user_xp_map + ) + if success: + total_submissions += 1 + elif skipped: skipped_submissions += 1 - continue - - # Skip submissions without valid author - if not author or not hasattr(author, 'id'): - print(f"Invalid author object for '{author_name}', skipping submission") - skipped_submissions += 1 - continue - - total_submissions += 1 - - # Award random XP in range - xp_gain = random.randint(150, 250) - - if author.id not in user_xp_map: - user_xp_map[author.id] = {'xp': 0, 'submissions': 0, 'comments': 0, 'name': author.name} - - user_xp_map[author.id]['xp'] += xp_gain - user_xp_map[author.id]['submissions'] += 1 except Exception as e: + author_name = submission.get('author', 'unknown') print(f"Unexpected error processing submission by '{author_name}': {type(e).__name__}: {e}") skipped_submissions += 1 - continue # Process comments comments_table = db.table('comments') print(f"Processing {len(comments_table.all())} comments") for comment in comments_table.all(): - author_name = comment.get('author') - if not author_name or author_name == "[deleted]": - skipped_comments += 1 - continue - try: - try: - author = reddit_bot.fetch_user(name=author_name) - except Exception as e: - print(f"Error fetching author '{author_name}' for comment: {type(e).__name__}: {e}") + success, skipped = self._process_reddit_item( + reddit_bot, comment, 'comment', user_xp_map + ) + if success: + total_comments += 1 + elif skipped: skipped_comments += 1 - continue - - # Skip comments without valid author - if not author or not hasattr(author, 'id'): - print(f"Invalid author object for '{author_name}', skipping comment") - skipped_comments += 1 - continue - - total_comments += 1 - - # Award random XP in range - xp_gain = random.randint(150, 250) - - if author.id not in user_xp_map: - user_xp_map[author.id] = {'xp': 0, 'submissions': 0, 'comments': 0, 'name': author.name} - - user_xp_map[author.id]['xp'] += xp_gain - user_xp_map[author.id]['comments'] += 1 except Exception as e: + author_name = comment.get('author', 'unknown') print(f"Unexpected error processing comment by '{author_name}': {type(e).__name__}: {e}") skipped_comments += 1 - continue - # Now update the rank database with accumulated XP - total_users = len(user_xp_map) - print(f"Updating {total_users} users in rank database") - - for user_id, stats in user_xp_map.items(): - database.GIT_ENABLED = False # set this on every iteration in case it was enabled somewhere else - - # Get existing user data or create new - user_data = self.db.get_user_data( - platform='reddit', - community_id=community_id, - user_id=user_id, - create_if_not_exists=True, - ) - - if user_data.get('xp', 0) > 0: - # User already has XP - updated_users += 1 - else: - # New user or user with no XP - new_users += 1 - - # Update user with imported data - user_data['xp'] = stats['xp'] - user_data['message_count'] = stats['submissions'] + stats['comments'] - user_data['submission_count'] = stats['submissions'] - user_data['comment_count'] = stats['comments'] - user_data['username'] = stats['name'] - user_data['reddit_import_date'] = datetime.now(UTC).isoformat() - - self.db.update_user_data( - platform='reddit', - community_id=community_id, - user_id=user_id, - data=user_data, - ) + # Update the rank database + new_users, updated_users = self._update_reddit_rank_database(community_id, user_xp_map) except Exception as e: print(f"Error during Reddit migration: {type(e).__name__}: {e}") - # Re-raise after enabling Git - database.GIT_ENABLED = True + database.GIT_ENABLED = original_git_enabled raise - database.GIT_ENABLED = True # Re-enable Git after operation + # Restore original Git state and force one final sync + database.GIT_ENABLED = original_git_enabled + self.db.sync() stats = { - 'total_users': total_users, + 'total_users': len(user_xp_map), 'new_users': new_users, 'updated_users': updated_users, 'total_submissions': total_submissions, @@ -621,72 +689,176 @@ async def migrate_from_mee6(self, guild_id: int) -> Dict[str, Union[int, str]]: Dict[str, Union[int, str]] Migration statistics """ + import asyncio + + # Use async-friendly locking + loop = asyncio.get_event_loop() + + # Acquire lock in a thread-safe way for async + await loop.run_in_executor(None, MIGRATION_LOCK.acquire) + + try: + return await self._do_mee6_migration(guild_id) + finally: + MIGRATION_LOCK.release() + + async def _fetch_mee6_page( + self, + session: aiohttp.ClientSession, + guild_id: int, + page: int, + ) -> Optional[list]: + """ + Fetch a single page of Mee6 data. + + Parameters + ---------- + session : aiohttp.ClientSession + HTTP session + guild_id : int + Discord guild ID + page : int + Page number to fetch + + Returns + ------- + Optional[list] + List of player data, or None if fetch failed + """ + url = f"https://mee6.xyz/api/plugins/levels/leaderboard/{guild_id}?page={page}" + print(f"Fetching Mee6 data from: {url}") + + try: + async with session.get(url, timeout=10) as response: + if response.status != 200: + print(f"Received status code {response.status}, stopping migration") + return None + + data = await response.json() + + if not data.get('players') or len(data['players']) == 0: + print("No more players found, stopping migration") + return None + + player_count = len(data['players']) + print(f"Processing {player_count} players from page {page}") + return data['players'] + + except aiohttp.ClientError as e: + print(f"HTTP error during migration: {e}") + return None + except Exception as e: + print(f"Unexpected error during migration: {e}") + import traceback + traceback.print_exc() + return None + + def _process_mee6_batch( + self, + guild_id: int, + batch: list, + ) -> Tuple[int, int]: + """ + Process a batch of Mee6 user updates. + + Parameters + ---------- + guild_id : int + Discord guild ID + batch : list + List of user update dictionaries + + Returns + ------- + Tuple[int, int] + (new_users, updated_users) + """ + new_users = 0 + updated_users = 0 + + with self.db as db: + table = db.table('discord_users') + + # Build a lookup map of existing users in this batch + existing_users_map = {} + for item in table.all(): + if item.get('community_id') == guild_id: + existing_users_map[item.get('user_id')] = item + + for user_update in batch: + user_id = user_update['user_id'] + existing = existing_users_map.get(user_id) + + if existing and existing.get('xp', 0) > 0: + updated_users += 1 + else: + new_users += 1 + + # Prepare user data + user_data = { + 'user_id': user_id, + 'community_id': guild_id, + 'xp': user_update['xp'], + 'message_count': user_update['message_count'], + 'username': user_update['username'], + 'mee6_import_date': user_update['mee6_import_date'], + } + + # Update or insert + if existing: + table.update(user_data, doc_ids=[existing.doc_id]) + else: + table.insert(user_data) + + return new_users, updated_users + + async def _do_mee6_migration(self, guild_id: int) -> Dict[str, Union[int, str]]: + """Internal method that performs the actual Mee6 migration.""" page = 0 total_users = 0 new_users = 0 updated_users = 0 + batch_updates = [] # Collect all updates before applying + original_git_enabled = database.GIT_ENABLED database.GIT_ENABLED = False # Disable Git for this operation async with aiohttp.ClientSession() as session: while True: - url = f"https://mee6.xyz/api/plugins/levels/leaderboard/{guild_id}?page={page}" - print(f"Fetching Mee6 data from: {url}") - - try: - # Add timeout to prevent hanging - async with session.get(url, timeout=10) as response: - if response.status != 200: - print(f"Received status code {response.status}, stopping migration") - break - - data = await response.json() - - if not data.get('players') or len(data['players']) == 0: - print("No more players found, stopping migration") - break - - player_count = len(data['players']) - print(f"Processing {player_count} players from page {page}") - - for player in data['players']: - database.GIT_ENABLED = False # set this on every loop in case it's updated in another place - - total_users += 1 - user_id = int(player['id']) - - # Get existing user data or create new - user_data = self.db.get_user_data( - platform='discord', - community_id=guild_id, - user_id=user_id, - create_if_not_exists=True, - ) - - if user_data.get('xp', 0) > 0: - # User already has XP, skip or update as needed - updated_users += 1 - continue - - # Update user with imported data - user_data['xp'] = player['xp'] - user_data['message_count'] = player.get('message_count', 0) - user_data['username'] = player.get('username', f"User {user_id}") - user_data['mee6_import_date'] = datetime.now(UTC).isoformat() - - self.db.update_user_data('discord', guild_id, user_id, user_data) - new_users += 1 - - except aiohttp.ClientError as e: - print(f"HTTP error during migration: {e}") - break - except Exception as e: - print(f"Unexpected error during migration: {e}") + players = await self._fetch_mee6_page(session, guild_id, page) + if players is None: break + for player in players: + total_users += 1 + user_id = int(player['id']) + + # Collect update data without writing yet + user_update = { + 'user_id': user_id, + 'xp': player['xp'], + 'message_count': player.get('message_count', 0), + 'username': player.get('username', f"User {user_id}"), + 'mee6_import_date': datetime.now(UTC).isoformat(), + } + batch_updates.append(user_update) + page += 1 - database.GIT_ENABLED = True # Re-enable Git after operation + # Now apply all updates in batches + print(f"Applying {len(batch_updates)} user updates in batches") + BATCH_SIZE = 100 + + for i in range(0, len(batch_updates), BATCH_SIZE): + batch = batch_updates[i:i+BATCH_SIZE] + batch_new, batch_updated = self._process_mee6_batch(guild_id, batch) + new_users += batch_new + updated_users += batch_updated + print(f"Completed batch {i//BATCH_SIZE + 1}/{(len(batch_updates)-1)//BATCH_SIZE + 1}") + + # Restore original Git state and force one final sync + database.GIT_ENABLED = original_git_enabled + self.db.sync() stats = { 'total_processed': total_users,