diff --git a/cogs/economy.py b/cogs/economy.py index 7fb7654a..d5d2cd1b 100644 --- a/cogs/economy.py +++ b/cogs/economy.py @@ -9,6 +9,7 @@ import utils.logger import asyncio import framework.isobot.currency +import framework.isobot.db from random import randint from discord import option, ApplicationContext from discord.ext import commands @@ -34,6 +35,7 @@ def get_item_names(self) -> list: wdir = os.getcwd() color = discord.Color.random() currency = framework.isobot.currency.CurrencyAPI("database/currency.json", "logs/currency.log") +levels = framework.isobot.db.Levels("database/levels.json", None) shop_data = ShopData(f"{wdir}/config/shop.json") all_item_ids = shop_data.get_item_ids() jobs = [ @@ -467,12 +469,12 @@ async def work_list(self, ctx: ApplicationContext): @commands.cooldown(1, 1800, commands.BucketType.user) async def work_select(self, ctx: ApplicationContext, job: str): if job not in jobs: return await ctx.respond(f"This job does not exist. What kind of a job is even {job}??", ephemeral=True) - if job == "YouTuber" and get_level(ctx.author.id) < 3: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) - elif job == "Streamer" and get_level(ctx.author.id) < 5: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) - elif job == "Developer" and get_level(ctx.author.id) < 10: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) - elif job == "Scientist" and get_level(ctx.author.id) < 20: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) - elif job == "Engineer" and get_level(ctx.author.id) < 25: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) - elif job == "Doctor" and get_level(ctx.author.id) < 40: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) + if job == "YouTuber" and levels.get_level(ctx.author.id) < 3: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) + elif job == "Streamer" and levels.get_level(ctx.author.id) < 5: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) + elif job == "Developer" and levels.get_level(ctx.author.id) < 10: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) + elif job == "Scientist" and levels.get_level(ctx.author.id) < 20: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) + elif job == "Engineer" and levels.get_level(ctx.author.id) < 25: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) + elif job == "Doctor" and levels.get_level(ctx.author.id) < 40: return await ctx.respond("You currently do not have the required level to perform this job!", ephemeral=True) userdat[str(ctx.author.id)]["work_job"] = job save() localembed = discord.Embed(title="New job!", description=f"You are now working as a {job}!") diff --git a/cogs/levelling.py b/cogs/levelling.py index 4b134f82..77cc4fd4 100644 --- a/cogs/levelling.py +++ b/cogs/levelling.py @@ -4,24 +4,26 @@ import discord import json import os.path +import framework.isobot.db from discord import option, ApplicationContext from discord.ext import commands # Variables wdir = os.getcwd() color = discord.Color.random() +levels = framework.isobot.db.Levels(f"{wdir}/database/levels.json", None) -with open(f"{wdir}/database/levels.json", 'r', encoding="utf-8") as f: levels = json.load(f) +# with open(f"{wdir}/database/levels.json", 'r', encoding="utf-8") as f: levels = json.load(f) -def save(): - with open(f"{wdir}/database/levels.json", 'w+', encoding="utf-8") as f: json.dump(levels, f, indent=4) +# def save(): +# with open(f"{wdir}/database/levels.json", 'w+', encoding="utf-8") as f: json.dump(levels, f, indent=4) # Functions -def get_xp(id: int) -> int: - return levels[str(id)]["xp"] +# def get_xp(id: int) -> int: +# return levels[str(id)]["xp"] -def get_level(id: int) -> int: - return levels[str(id)]["level"] +# def get_level(id: int) -> int: +# return levels[str(id)]["level"] # Commands class Levelling(commands.Cog): @@ -37,8 +39,8 @@ async def rank(self, ctx: ApplicationContext, user:discord.User=None): if user is None: user = ctx.author try: localembed = discord.Embed(title=f"{user.display_name}'s rank", color=color) - localembed.add_field(name="Level", value=levels[str(user.id)]["level"]) - localembed.add_field(name="XP", value=levels[str(user.id)]["xp"]) + localembed.add_field(name="Level", value=levels.get_level(user.id)) + localembed.add_field(name="XP", value=levels.get_xp(user.id)) localembed.set_footer(text="Keep chatting to earn levels!") await ctx.respond(embed = localembed) except KeyError: return await ctx.respond("Looks like that user isn't indexed yet. Try again later.", ephemeral=True) @@ -52,8 +54,8 @@ async def rank(self, ctx: ApplicationContext, user:discord.User=None): async def edit_rank(self, ctx: ApplicationContext, user:discord.User, new_rank:int): if ctx.author.id != 738290097170153472: return await ctx.respond("This command isn't for you.", ephemeral=True) try: - levels[str(user.id)]["level"] = new_rank - await ctx.respond(f"{user.display_name}\'s rank successfully edited. `New Rank: {levels[str(user.id)]['level']}`") + levels.edit_level(user.id, new_rank) + await ctx.respond(f"{user.display_name}\'s rank successfully edited. `New Rank: {levels.get_level(user.id)}`") except KeyError: return await ctx.respond("That user isn't indexed yet.", ephemeral=True) @commands.slash_command( @@ -65,8 +67,8 @@ async def edit_rank(self, ctx: ApplicationContext, user:discord.User, new_rank:i async def edit_xp(self, ctx: ApplicationContext, user:discord.User, new_xp:int): if ctx.author.id != 738290097170153472: return await ctx.respond("This command isn't for you.", ephemeral=True) try: - levels[str(user.id)]["xp"] = new_xp - await ctx.respond(f"{user.display_name}\'s XP count successfully edited. `New XP: {levels[str(user.id)]['xp']}`") + levels.edit_xp(user.id, new_xp) + await ctx.respond(f"{user.display_name}\'s XP count successfully edited. `New XP: {levels.get_xp(user.id)}`") except KeyError: return await ctx.respond("That user isn't indexed yet.", ephemeral=True) @commands.slash_command( @@ -76,7 +78,7 @@ async def edit_xp(self, ctx: ApplicationContext, user:discord.User, new_xp:int): async def leaderboard_levels(self, ctx: ApplicationContext): levels_dict = dict() for person in levels: - levels_dict[str(person)] = levels[str(person)]["level"] + levels_dict[str(person)] = levels.get_level(person) undicted_leaderboard = sorted(levels_dict.items(), key=lambda x:x[1], reverse=True) dicted_leaderboard = dict(undicted_leaderboard) parsed_output = str() diff --git a/cogs/utils.py b/cogs/utils.py index 5cfa9c87..f68fb0a0 100644 --- a/cogs/utils.py +++ b/cogs/utils.py @@ -8,9 +8,9 @@ import openai import framework.isobot.embedengine import framework.isobot.currency +import framework.isobot.db from discord import option, ApplicationContext from discord.ext import commands -from cogs.levelling import get_level, get_xp from cogs.afk import get_presence # Variables @@ -18,6 +18,7 @@ currency = framework.isobot.currency.CurrencyAPI("database/currency.json", "logs/currency.log") openai.api_key = os.getenv("chatgpt_API_KEY") chatgpt_conversation = dict() +levels = framework.isobot.db.Levels("database/levels.json", None) # Commands class Utils(commands.Cog): @@ -95,7 +96,7 @@ async def profile(self, ctx: ApplicationContext, user: discord.User = None): if user is None: user = ctx.author localembed = discord.Embed(title=f"{user.display_name}'s isobot stats", color=color) localembed.set_thumbnail(url=user.avatar) - localembed.add_field(name="Level", value=f"Level {get_level(user.id)} ({get_xp(user.id)} XP)", inline=False) + localembed.add_field(name="Level", value=f"Level {levels.get_level(user.id)} ({levels.get_xp(user.id)} XP)", inline=False) localembed.add_field(name="Balance in Wallet", value=f"{currency.get_wallet(user.id)} coins", inline=True) localembed.add_field(name="Balance in Bank Account", value=f"{currency.get_bank(user.id)} coins", inline=True) localembed.add_field(name="Net-Worth", value=f"{currency.get_user_networth(user.id)} coins", inline=True) diff --git a/cogs/weather.py b/cogs/weather.py index 5f94b64e..41308ed1 100644 --- a/cogs/weather.py +++ b/cogs/weather.py @@ -3,17 +3,13 @@ import json import requests import os +import framework.isobot.db from discord import ApplicationContext, option from discord.ext import commands # Variables api_key = os.environ['openweathermap_API_KEY'] -with open("database/weather.json", 'r', encoding="utf-8") as f: user_db = json.load(f) - -# Functions -def save(): - """Dumps all cached databases to storage.""" - with open("database/weather.json", 'w+', encoding="utf-8") as f: json.dump(user_db, f, indent=4) +weather_db = framework.isobot.db.Weather("database/weather.json", None) # Commands class Weather(commands.Cog): @@ -26,13 +22,12 @@ def __init__(self, bot): ) @option(name="location", description="What location do you want to set?", type=str) async def weather_set_location(self, ctx: ApplicationContext, location: str): - if str(ctx.author.id) not in user_db: user_db[str(ctx.author.id)] = {"location": None, "scale": "Celsius"} + weather_db.new(ctx.author.id) test_ping = requests.get(f"https://api.openweathermap.org/data/2.5/weather?q={location}&appid={api_key}").content test_ping_json = json.loads(test_ping) if test_ping_json["cod"] == '404': return await ctx.respond(":warning: This location does not exist.", ephemeral=True) else: - user_db[str(ctx.author.id)]["location"] = location.lower() - save() + weather_db.set_default_location(ctx.author.id, location.lower()) localembed = discord.Embed(description="Your default location has been updated.", color=discord.Color.green()) await ctx.respond(embed=localembed) @@ -42,10 +37,9 @@ async def weather_set_location(self, ctx: ApplicationContext, location: str): ) @option(name="scale", description="Which scale do you want to use?", type=str, choices=["Celsius", "Fahrenheit", "Kelvin"]) async def weather_set_scale(self, ctx: ApplicationContext, scale: str): - if str(ctx.author.id) not in user_db: user_db[str(ctx.author.id)] = {"location": None, "scale": "Celsius"} + weather_db.new(ctx.author.id) if scale not in ["Celsius", "Fahrenheit", "Kelvin"]: return 1 - user_db[str(ctx.author.id)]["scale"] = scale - save() + weather_db.set_scale(ctx.author.id, scale) localembed = discord.Embed(description="Your preferred unit scale has been updated.", color=discord.Color.green()) await ctx.respond(embed=localembed) @@ -55,10 +49,10 @@ async def weather_set_scale(self, ctx: ApplicationContext, scale: str): ) @option(name="location", description="The location you want weather info about (leave empty for set location)", type=str, default=None) async def weather(self, ctx: ApplicationContext, location: str = None): - if str(ctx.author.id) not in user_db: user_db[str(ctx.author.id)] = {"location": None, "scale": "Celsius"} + weather_db.new(ctx.author.id) if location == None: - if user_db[str(ctx.author.id)]["location"] == None: return await ctx.respond("You do not have a default location set yet.\nEnter a location name and try again.", ephemeral=True) - else: location = user_db[str(ctx.author.id)]["location"] + if weather_db.get_location(ctx.author.id) == None: return await ctx.respond("You do not have a default location set yet.\nEnter a location name and try again.", ephemeral=True) + else: location = weather_db.get_default_location(ctx.author.id) location = location.replace(" ", "%20") api_request = requests.get(f"https://api.openweathermap.org/data/2.5/weather?q={location}&appid={api_key}").content req: dict = json.loads(api_request) @@ -71,11 +65,11 @@ async def weather(self, ctx: ApplicationContext, location: str = None): # Stripped API request data loc_name = req["name"] - if user_db[str(ctx.author.id)]["scale"] == "Celsius": + if weather_db.get_scale(ctx.author.id) == "Celsius": temp = round(req["main"]["temp"] - 273) temp_max = round(req["main"]["temp_max"] - 273) temp_min = round(req["main"]["temp_min"] - 273) - elif user_db[str(ctx.author.id)]["scale"] == "Fahrenheit": + elif weather_db.get_scale(ctx.author.id) == "Fahrenheit": temp = round(((req["main"]["temp"] - 273) * 9/5) + 32) temp_max = round(((req["main"]["temp_max"] - 273) * 9/5) + 32) temp_min = round(((req["main"]["temp_min"] - 273) * 9/5) + 32) @@ -94,8 +88,8 @@ async def weather(self, ctx: ApplicationContext, location: str = None): description=f"**{forcast}**\n{forcast_description}", color=discord.Color.blue() ) - if user_db[str(ctx.author.id)]["scale"] == "Celsius": localembed.add_field(name="Temperature", value=f"**{temp}C** (max: {temp_max}C, min: {temp_min}C)") - elif user_db[str(ctx.author.id)]["scale"] == "Fahrenheit": localembed.add_field(name="Temperature", value=f"**{temp}F** (max: {temp_max}F, min: {temp_min}F)") + if weather_db.get_scale(ctx.author.id) == "Celsius": localembed.add_field(name="Temperature", value=f"**{temp}C** (max: {temp_max}C, min: {temp_min}C)") + elif weather_db.get_scale(ctx.author.id) == "Fahrenheit": localembed.add_field(name="Temperature", value=f"**{temp}F** (max: {temp_max}F, min: {temp_min}F)") else: localembed.add_field(name="Temperature", value=f"**{temp}K** (max: {temp_max}K, min: {temp_min}K)") localembed.add_field(name="Humidity", value=f"{humidity}%") localembed.add_field(name="Sunrise", value=f"", inline=False) diff --git a/framework/isobot/db.py b/framework/isobot/db.py new file mode 100644 index 00000000..b1c9b08c --- /dev/null +++ b/framework/isobot/db.py @@ -0,0 +1,152 @@ +"""A module to manage the databases in isobot.""" + +# Imports +import json +from discord import User +import datetime + +def get_time(): + """Fetches and returns the current time.""" + return datetime.datetime.now().strftime("%H:%M:%S") + +class Colors: + """Contains general stdout colors.""" + cyan = '\033[96m' + red = '\033[91m' + green = '\033[92m' + end = '\033[0m' + +class Items(Colors): + """Class to interact the items db""" + def __init__(self, db_path: str, log_path: str): + self.db_path = db_path + self.log_path = log_path + print(f"[Framework/Loader] {Colors.green}Items database initialized.{Colors.end}") + + def new(self, user_id: User) -> int: + with open(self.db_path, 'r') as f: items = json.load(f) + if str(user_id) not in items: items[str(user_id)] = {} + with open("config/shop.json", 'r') as f: shopitem = json.load(f) + for z in shopitem.keys(): + if z in items[str(user_id)]: pass + else: items[str(user_id)][str(z)] = 0 + with open(self.db_path, 'w+') as f: json.dump(items, f, indent=4) + return 0 + + def add_item(self, user_id: User, item: str, quantity: int) -> int: + with open(self.db_path, 'r') as f: items = json.load(f) + items[str(user_id)][item] += quantity + with open(self.db_path, 'w+') as f: json.dump(items, f, indent=4) + return 0 + + def remove_item(self, user_id: User, item: str, quantity: int) -> int: + with open(self.db_path, 'r') as f: items = json.load(f) + items[str(user_id)][item] -= quantity + with open(self.db_path, 'w+') as f: json.dump(items, f, indent=4) + return 0 + +class Levels(Colors): + """Class to interact the levels db""" + def __init__(self, db_path: str, log_path: str): + self.db_path = db_path + self.log_path = log_path + print(f"[Framework/Loader] {Colors.green}Levels database initialized.{Colors.end}") + + def new(self, user_id: User) -> int: + with open(self.db_path, 'r') as f: levels = json.load(f) + if str(user_id) not in levels: + levels[str(user_id)] = { + "xp": 0, + "level": 0 + } + with open(self.db_path, 'w+') as f: json.dump(levels, f, indent=4) + return 0 + + def get_level(self, user_id: User) -> int: + with open(self.db_path, 'r') as f: levels = json.load(f) + return levels[str(user_id)]["level"] + + def get_xp(self, user_id: User) -> int: + with open(self.db_path, 'r') as f: levels = json.load(f) + return levels[str(user_id)]["xp"] + + def add_levels(self, user_id: User, level: int): + with open(self.db_path, 'r') as f: levels = json.load(f) + levels[str(user_id)]["level"] += level + with open(self.db_path, 'w+') as f: json.dump(levels, f, indent=4) + return 0 + + def remove_levels(self, user_id: User, level: int): + with open(self.db_path, 'r') as f: levels = json.load(f) + levels[str(user_id)]["level"] += level + with open(self.db_path, 'w+') as f: json.dump(levels, f, indent=4) + return 0 + + def reset_level(self, user_id: User): + with open(self.db_path, 'r') as f: levels = json.load(f) + levels[str(user_id)]["level"] = 0 + with open(self.db_path, 'w+') as f: json.dump(levels, f, indent=4) + return 0 + + def add_xp(self, user_id: User, xp: int): + with open(self.db_path, 'r') as f: levels = json.load(f) + levels[str(user_id)]["xp"] += xp + with open(self.db_path, 'w+') as f: json.dump(levels, f, indent=4) + return 0 + + def remove_xp(self, user_id: User, xp: int): + with open(self.db_path, 'r') as f: levels = json.load(f) + levels[str(user_id)]["xp"] += xp + with open(self.db_path, 'w+') as f: json.dump(levels, f, indent=4) + return 0 + + def reset_xp(self, user_id: User): + with open(self.db_path, 'r') as f: levels = json.load(f) + levels[str(user_id)]["xp"] = 0 + with open(self.db_path, 'w+') as f: json.dump(levels, f, indent=4) + return 0 + + def edit_level(self, user_id: User, level: int): + with open(self.db_path, 'r') as f: levels = json.load(f) + levels[str(user_id)]["level"] = level + with open(self.db_path, 'w+') as f: json.dump(levels, f, indent=4) + return 0 + + def edit_xp(self, user_id: User, xp: int): + with open(self.db_path, 'r') as f: levels = json.load(f) + levels[str(user_id)]["xp"] = xp + with open(self.db_path, 'w+') as f: json.dump(levels, f, indent=4) + return 0 + +class Weather(Colors): + """Class to manage the weather db""" + def __init__(self, db_path: str, log_path: str): + self.db_path = db_path + self.log_path = log_path + print(f"[Framework/Loader] {Colors.green}Weather database initialized.{Colors.end}") + + def new(self, user_id: User): + with open("database/weather.json", 'r', encoding="utf-8") as f: user_db = json.load(f) + if str(user_id) not in user_db: user_db[str(user_id)] = {"location": None, "scale": "Celsius"} + with open("database/weather.json", 'w+', encoding="utf-8") as f: json.dump(user_db, f, indent=4) + return 0 + + def set_scale(self, user_id: User, scale: str) -> int: + with open("database/weather.json", 'r', encoding="utf-8") as f: user_db = json.load(f) + user_db[str(user_id)]["scale"] = scale + with open("database/weather.json", 'w+', encoding="utf-8") as f: json.dump(user_db, f, indent=4) + return 0 + + def set_default_location(self, user_id: User, location: str) -> int: + with open("database/weather.json", 'r', encoding="utf-8") as f: user_db = json.load(f) + user_db[str(user_id)]["location"] = location + with open("database/weather.json", 'w+', encoding="utf-8") as f: json.dump(user_db, f, indent=4) + return 0 + + def get_scale(self, user_id: User): + with open("database/weather.json", 'r', encoding="utf-8") as f: user_db = json.load(f) + return user_db[str(user_id)]["scale"] + + def get_default_location(self, user_id: User): + with open("database/weather.json", 'r', encoding="utf-8") as f: user_db = json.load(f) + return user_db[str(user_id)]["location"] diff --git a/main.py b/main.py index 54cefd2b..2ce42fe7 100644 --- a/main.py +++ b/main.py @@ -17,6 +17,7 @@ import framework.isobank.authorize import framework.isobank.manager import framework.isobot.embedengine +import framework.isobot.db from discord import ApplicationContext, option from discord.ext import commands from discord.ext.commands import * @@ -32,19 +33,14 @@ client = discord.Bot() color = discord.Color.random() wdir = os.getcwd() -with open('database/items.json', 'r', encoding="utf-8") as f: items = json.load(f) -with open('config/shop.json', 'r', encoding="utf-8") as f: shopitem = json.load(f) with open('database/presence.json', 'r', encoding="utf-8") as f: presence = json.load(f) -with open('database/levels.json', 'r', encoding="utf-8") as f: levels = json.load(f) with open('config/commands.json', 'r', encoding="utf-8") as f: commandsdb = json.load(f) with open('database/automod.json', 'r', encoding="utf-8") as f: automod_config = json.load(f) cmd_list = commandsdb.keys() #Pre-Initialization Commands def save(): - with open('database/items.json', 'w+', encoding="utf-8") as f: json.dump(items, f, indent=4) with open('database/presence.json', 'w+', encoding="utf-8") as f: json.dump(presence, f, indent=4) - with open('database/levels.json', 'w+', encoding="utf-8") as f: json.dump(levels, f, indent=4) with open('database/automod.json', 'w+', encoding="utf-8") as f: json.dump(automod_config, f, indent=4) if not os.path.isdir("logs"): @@ -63,6 +59,8 @@ def save(): #Framework Module Loader colors = framework.isobot.colors.Colors() currency = framework.isobot.currency.CurrencyAPI("database/currency.json", "logs/currency.log") +items = framework.isobot.currency.Items("database/items.json", None) +levels = framework.isobot.currency.Levels("database/levels.json", None) # Theme Loader themes = False # True: enables themes; False: disables themes; @@ -101,8 +99,8 @@ async def on_message(ctx): currency.new_bank(ctx.author.id) create_isocoin_key(ctx.author.id) new_userdat(ctx.author.id) - if str(ctx.author.id) not in items: items[str(ctx.author.id)] = {} - if str(ctx.author.id) not in levels: levels[str(ctx.author.id)] = {"xp": 0, "level": 0} + items.new(ctx.author.id) + levels.new(ctx.author.id) if str(ctx.guild.id) not in automod_config: automod_config[str(ctx.guild.id)] = { "swear_filter": { @@ -114,9 +112,6 @@ async def on_message(ctx): } } } - for z in shopitem: - if z in items[str(ctx.author.id)]: pass - else: items[str(ctx.author.id)][str(z)] = 0 save() uList = list() if str(ctx.guild.id) in presence: @@ -133,15 +128,15 @@ async def on_message(ctx): await asyncio.sleep(5) await m1.delete() if not ctx.author.bot: - levels[str(ctx.author.id)]["xp"] += randint(1, 5) + levels.add_xp(ctx.author.id, randint(1, 5)) xpreq = 0 - for level in range(int(levels[str(ctx.author.id)]["level"])): + for level in range(int(levels.get_level(ctx.author.id))): xpreq += 50 if xpreq >= 5000: break - if levels[str(ctx.author.id)]["xp"] >= xpreq: - levels[str(ctx.author.id)]["xp"] = 0 - levels[str(ctx.author.id)]["level"] += 1 - await ctx.author.send(f"{ctx.author.mention}, you just ranked up to **level {levels[str(ctx.author.id)]['level']}**. Nice!") + if levels.get_xp(ctx.author.id) >= xpreq: + levels.reset_xp(ctx.author.id) + levels.add_levels(ctx.author.id, 1) + await ctx.author.send(f"{ctx.author.mention}, you just ranked up to **level {levels.get_level(ctx.author.id)}**. Nice!") save() if automod_config[str(ctx.guild.id)]["swear_filter"]["enabled"] == True: if automod_config[str(ctx.guild.id)]["swear_filter"]["keywords"]["use_default"] and any(x in ctx.content.lower() for x in automod_config[str(ctx.guild.id)]["swear_filter"]["keywords"]["default"]):