From 386f15a0ff871d9edef24361b17cb6d402b49c06 Mon Sep 17 00:00:00 2001 From: snipe <72265661+notsniped@users.noreply.github.com> Date: Thu, 29 Dec 2022 08:09:02 +0530 Subject: [PATCH] Remove discord_slash library --- discord_slash/__init__.py | 15 - discord_slash/client.py | 1688 ---------------------- discord_slash/cog_ext.py | 291 ---- discord_slash/const.py | 5 - discord_slash/context.py | 776 ---------- discord_slash/dpy_overrides.py | 326 ----- discord_slash/error.py | 96 -- discord_slash/http.py | 208 --- discord_slash/model.py | 715 --------- discord_slash/utils/__init__.py | 9 - discord_slash/utils/manage_commands.py | 405 ------ discord_slash/utils/manage_components.py | 323 ----- 12 files changed, 4857 deletions(-) delete mode 100644 discord_slash/__init__.py delete mode 100644 discord_slash/client.py delete mode 100644 discord_slash/cog_ext.py delete mode 100644 discord_slash/const.py delete mode 100644 discord_slash/context.py delete mode 100644 discord_slash/dpy_overrides.py delete mode 100644 discord_slash/error.py delete mode 100644 discord_slash/http.py delete mode 100644 discord_slash/model.py delete mode 100644 discord_slash/utils/__init__.py delete mode 100644 discord_slash/utils/manage_commands.py delete mode 100644 discord_slash/utils/manage_components.py diff --git a/discord_slash/__init__.py b/discord_slash/__init__.py deleted file mode 100644 index ebbfda06..00000000 --- a/discord_slash/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -""" -discord-py-slash-command -~~~~~~~~~~~~~~~~~~~~~~~~ -Simple Discord Slash Command extension for discord.py -:copyright: (c) 2020-2021 eunwoo1104 -:license: MIT -""" - -from .client import SlashCommand # noqa: F401 -from .const import __version__ # noqa: F401 -from .context import ComponentContext, MenuContext, SlashContext # noqa: F401 -from .dpy_overrides import ComponentMessage # noqa: F401 -from .model import ButtonStyle, ComponentType, ContextMenuType, SlashCommandOptionType # noqa: F401 -from .utils import manage_commands # noqa: F401 -from .utils import manage_components # noqa: F401 diff --git a/discord_slash/client.py b/discord_slash/client.py deleted file mode 100644 index b0ebfa31..00000000 --- a/discord_slash/client.py +++ /dev/null @@ -1,1688 +0,0 @@ -import copy -import logging -import re -import typing -import warnings -from contextlib import suppress -from inspect import getdoc, iscoroutinefunction - -import discord -from discord.ext import commands - -from . import context, error, http, model -from .utils import manage_commands -from .utils.manage_components import get_components_ids, get_messages_ids - - -def _get_val(d: dict, key): # util function to get value from dict with fallback to None key - try: - value = d[key] - except KeyError: # if there is no specific key set, we fallback to "global/any" - value = d[None] - return value - - -class SlashCommand: - """ - Slash command handler class. - - :param client: discord.py Client or Bot instance. - :type client: Union[discord.Client, discord.ext.commands.Bot] - :param sync_commands: Whether to sync commands automatically. Default `False`. - :type sync_commands: bool - :param debug_guild: Guild ID of guild to use for testing commands. Prevents setting global commands in favor of guild commands, which update instantly - :type debug_guild: int - :param delete_from_unused_guilds: If the bot should make a request to set no commands for guilds that haven't got any commands registered in :class:``SlashCommand``. Default `False`. - :type delete_from_unused_guilds: bool - :param sync_on_cog_reload: Whether to sync commands on cog reload. Default `False`. - :type sync_on_cog_reload: bool - :param override_type: Whether to override checking type of the client and try register event. - :type override_type: bool - :param application_id: The application id of the bot, required only when the application id and bot id are different. (old bots) - :type application_id: int - - .. note:: - If ``sync_on_cog_reload`` is enabled, command syncing will be triggered when :meth:`discord.ext.commands.Bot.reload_extension` - is triggered. - - :ivar _discord: Discord client of this client. - :ivar commands: Dictionary of the registered commands via :func:`.slash` decorator. - :ivar menu_commands: Dictionary of the registered context menus via the :func:`.context_menu` decorator. - :ivar req: :class:`.http.SlashCommandRequest` of this client. - :ivar logger: Logger of this client. - :ivar sync_commands: Whether to sync commands automatically. - :ivar sync_on_cog_reload: Whether to sync commands on cog reload. - :ivar has_listener: Whether discord client has listener add function. - """ - - def __init__( - self, - client: typing.Union[discord.Client, commands.Bot], - sync_commands: bool = False, - debug_guild: typing.Optional[int] = None, - delete_from_unused_guilds: bool = False, - sync_on_cog_reload: bool = False, - override_type: bool = False, - application_id: typing.Optional[int] = None, - ): - self._discord = client - self.commands = {"context": {}} - self.subcommands = {} - self.components = {} - self.logger = logging.getLogger("discord_slash") - self.req = http.SlashCommandRequest(self.logger, self._discord, application_id) - self.sync_commands = sync_commands - self.debug_guild = debug_guild - self.sync_on_cog_reload = sync_on_cog_reload - - if self.sync_commands: - self._discord.loop.create_task(self.sync_all_commands(delete_from_unused_guilds)) - - if ( - not isinstance(client, commands.Bot) - and not isinstance(client, commands.AutoShardedBot) - and not override_type - ): - self.logger.warning( - "Detected discord.Client! It is highly recommended to use `commands.Bot`. Do not add any `on_socket_response` event." - ) - - self._discord.on_socket_response = self.on_socket_response - self.has_listener = False - else: - if not hasattr(self._discord, "slash"): - self._discord.slash = self - else: - raise error.DuplicateSlashClient("You can't have duplicate SlashCommand instances!") - - self._discord.add_listener(self.on_socket_response) - self.has_listener = True - default_add_function = self._discord.add_cog - - def override_add_cog(cog: commands.Cog): - default_add_function(cog) - self.get_cog_commands(cog) - - self._discord.add_cog = override_add_cog - default_remove_function = self._discord.remove_cog - - def override_remove_cog(name: str): - cog = self._discord.get_cog(name) - if cog is None: - return - self.remove_cog_commands(cog) - default_remove_function(name) - - self._discord.remove_cog = override_remove_cog - - if self.sync_on_cog_reload: - orig_reload = self._discord.reload_extension - - def override_reload_extension(*args): - orig_reload(*args) - self._discord.loop.create_task( - self.sync_all_commands(delete_from_unused_guilds) - ) - - self._discord.reload_extension = override_reload_extension - - def get_cog_commands(self, cog: commands.Cog): - """ - Gets slash command from :class:`discord.ext.commands.Cog`. - - .. note:: - Since version ``1.0.9``, this gets called automatically during cog initialization. - - :param cog: Cog that has slash commands. - :type cog: discord.ext.commands.Cog - """ - if hasattr(cog, "_slash_registered"): # Temporary warning - return self.logger.warning( - "Calling get_cog_commands is no longer required " - "to add cog slash commands. Make sure to remove all calls to this function." - ) - cog._slash_registered = True # Assuming all went well - func_list = [getattr(cog, x) for x in dir(cog)] - - self._get_cog_slash_commands(cog, func_list) - self._get_cog_component_callbacks(cog, func_list) - - def _get_cog_slash_commands(self, cog, func_list): - res = [ - x - for x in func_list - if isinstance(x, (model.CogBaseCommandObject, model.CogSubcommandObject)) - ] - - for x in res: - x.cog = cog - if isinstance(x, model.CogBaseCommandObject): - if x.name in self.commands: - raise error.DuplicateCommand(x.name) - self.commands[x.name] = x - else: - if x.base in self.commands: - base_command = self.commands[x.base] - for i in x.allowed_guild_ids: - if i not in base_command.allowed_guild_ids: - base_command.allowed_guild_ids.append(i) - - base_permissions = x.base_command_data["api_permissions"] - if base_permissions: - for applicable_guild in base_permissions: - if applicable_guild not in base_command.permissions: - base_command.permissions[applicable_guild] = [] - base_command.permissions[applicable_guild].extend( - base_permissions[applicable_guild] - ) - - self.commands[x.base].has_subcommands = True - - else: - self.commands[x.base] = model.BaseCommandObject(x.base, x.base_command_data) - if x.base not in self.subcommands: - self.subcommands[x.base] = {} - if x.subcommand_group: - if x.subcommand_group not in self.subcommands[x.base]: - self.subcommands[x.base][x.subcommand_group] = {} - if x.name in self.subcommands[x.base][x.subcommand_group]: - raise error.DuplicateCommand(f"{x.base} {x.subcommand_group} {x.name}") - self.subcommands[x.base][x.subcommand_group][x.name] = x - else: - if x.name in self.subcommands[x.base]: - raise error.DuplicateCommand(f"{x.base} {x.name}") - self.subcommands[x.base][x.name] = x - - def _get_cog_component_callbacks(self, cog, func_list): - res = [x for x in func_list if isinstance(x, model.CogComponentCallbackObject)] - - for x in res: - x.cog = cog - self._add_comp_callback_obj(x) - - def remove_cog_commands(self, cog): - """ - Removes slash command from :class:`discord.ext.commands.Cog`. - - .. note:: - Since version ``1.0.9``, this gets called automatically during cog de-initialization. - - :param cog: Cog that has slash commands. - :type cog: discord.ext.commands.Cog - """ - if hasattr(cog, "_slash_registered"): - del cog._slash_registered - func_list = [getattr(cog, x) for x in dir(cog)] - self._remove_cog_slash_commands(func_list) - self._remove_cog_component_callbacks(func_list) - - def _remove_cog_slash_commands(self, func_list): - res = [ - x - for x in func_list - if isinstance(x, (model.CogBaseCommandObject, model.CogSubcommandObject)) - ] - for x in res: - if isinstance(x, model.CogBaseCommandObject): - if x.name not in self.commands: - continue # Just in case it is removed due to subcommand. - if x.name in self.subcommands: - self.commands[x.name].func = None - continue # Let's remove completely when every subcommand is removed. - del self.commands[x.name] - else: - if x.base not in self.subcommands: - continue # Just in case... - if x.subcommand_group: - del self.subcommands[x.base][x.subcommand_group][x.name] - if not self.subcommands[x.base][x.subcommand_group]: - del self.subcommands[x.base][x.subcommand_group] - else: - del self.subcommands[x.base][x.name] - if not self.subcommands[x.base]: - del self.subcommands[x.base] - if x.base in self.commands: - if self.commands[x.base].func: - self.commands[x.base].has_subcommands = False - else: - del self.commands[x.base] - - def _remove_cog_component_callbacks(self, func_list): - res = [x for x in func_list if isinstance(x, model.CogComponentCallbackObject)] - - for x in res: - self.remove_component_callback_obj(x) - - async def to_dict(self): - """ - Converts all commands currently registered to :class:`SlashCommand` to a dictionary. - Returns a dictionary in the format: - - .. code-block:: python - - { - "global" : [], # list of global commands - "guild" : { - 0000: [] # list of commands in the guild 0000 - } - } - - Commands are in the format specified by discord `here `_ - """ - await self._discord.wait_until_ready() # In case commands are still not registered to SlashCommand. - all_guild_ids = [] - for x in self.commands: - if x == "context": - # handle context menu separately. - for _x in self.commands["context"]: - _selected = self.commands["context"][_x] - for i in _selected.allowed_guild_ids: - if i not in all_guild_ids: - all_guild_ids.append(i) - continue - for i in self.commands[x].allowed_guild_ids: - if i not in all_guild_ids: - all_guild_ids.append(i) - cmds = {"global": [], "guild": {x: [] for x in all_guild_ids}} - wait = {} # Before merging to return dict, let's first put commands to temporary dict. - for x in self.commands: - if x == "context": - # handle context menu separately. - for _x in self.commands["context"]: # x is the new reference dict - selected = self.commands["context"][_x] - - if selected.allowed_guild_ids: - for y in selected.allowed_guild_ids: - if y not in wait: - wait[y] = {} - command_dict = { - "name": _x, - "options": selected.options or [], - "default_permission": selected.default_permission, - "permissions": {}, - "type": selected._type, - } - if y in selected.permissions: - command_dict["permissions"][y] = selected.permissions[y] - wait[y][_x] = copy.deepcopy(command_dict) - else: - if "global" not in wait: - wait["global"] = {} - command_dict = { - "name": _x, - "options": selected.options or [], - "default_permission": selected.default_permission, - "permissions": selected.permissions or {}, - "type": selected._type, - } - wait["global"][_x] = copy.deepcopy(command_dict) - - continue - - selected = self.commands[x] - if selected.allowed_guild_ids: - for y in selected.allowed_guild_ids: - if y not in wait: - wait[y] = {} - command_dict = { - "name": x, - "description": selected.description or "No Description.", - "options": selected.options or [], - "default_permission": selected.default_permission, - "permissions": {}, - "type": selected._type, - } - if command_dict["type"] != 1: - command_dict.pop("description") - if y in selected.permissions: - command_dict["permissions"][y] = selected.permissions[y] - wait[y][x] = copy.deepcopy(command_dict) - else: - if "global" not in wait: - wait["global"] = {} - command_dict = { - "name": x, - "description": selected.description or "No Description.", - "options": selected.options or [], - "default_permission": selected.default_permission, - "permissions": selected.permissions or {}, - "type": selected._type, - } - if command_dict["type"] != 1: - command_dict.pop("description") - wait["global"][x] = copy.deepcopy(command_dict) - - # Separated normal command add and subcommand add not to - # merge subcommands to one. More info at Issue #88 - # https://github.com/eunwoo1104/discord-py-slash-command/issues/88 - - for x in self.commands: - if x == "context": - continue # no menus have subcommands. - - if not self.commands[x].has_subcommands: - continue - tgt = self.subcommands[x] - for y in tgt: - sub = tgt[y] - if isinstance(sub, model.SubcommandObject): - _dict = { - "name": sub.name, - "description": sub.description or "No Description.", - "type": model.SlashCommandOptionType.SUB_COMMAND, - "options": sub.options or [], - } - if sub.allowed_guild_ids: - for z in sub.allowed_guild_ids: - wait[z][x]["options"].append(_dict) - else: - wait["global"][x]["options"].append(_dict) - else: - queue = {} - base_dict = { - "name": y, - "description": "No Description.", - "type": model.SlashCommandOptionType.SUB_COMMAND_GROUP, - "options": [], - } - for z in sub: - sub_sub = sub[z] - _dict = { - "name": sub_sub.name, - "description": sub_sub.description or "No Description.", - "type": model.SlashCommandOptionType.SUB_COMMAND, - "options": sub_sub.options or [], - } - if sub_sub.allowed_guild_ids: - for i in sub_sub.allowed_guild_ids: - if i not in queue: - queue[i] = copy.deepcopy(base_dict) - queue[i]["options"].append(_dict) - else: - if "global" not in queue: - queue["global"] = copy.deepcopy(base_dict) - queue["global"]["options"].append(_dict) - for i in queue: - wait[i][x]["options"].append(queue[i]) - - for x in wait: - if x == "global": - [cmds["global"].append(n) for n in wait["global"].values()] - else: - [cmds["guild"][x].append(n) for n in wait[x].values()] - - return cmds - - async def sync_all_commands( - self, delete_from_unused_guilds=False, delete_perms_from_unused_guilds=False - ): - """ - Matches commands registered on Discord to commands registered here. - Deletes any commands on Discord but not here, and registers any not on Discord. - This is done with a `put` request. - A PUT request will only be made if there are changes detected. - If ``sync_commands`` is ``True``, then this will be automatically called. - - :param delete_from_unused_guilds: If the bot should make a request to set no commands for guilds that haven't got any commands registered in :class:``SlashCommand`` - :param delete_perms_from_unused_guilds: If the bot should make a request to clear permissions for guilds that haven't got any permissions registered in :class:``SlashCommand`` - """ - permissions_map = {} - cmds = await self.to_dict() - self.logger.info("Syncing commands...") - # if debug_guild is set, global commands get re-routed to the guild to update quickly - cmds_formatted = {self.debug_guild: cmds["global"]} - for guild in cmds["guild"]: - cmds_formatted[guild] = cmds["guild"][guild] - - for scope in cmds_formatted: - permissions = {} - new_cmds = cmds_formatted[scope] - existing_cmds = await self.req.get_all_commands(guild_id=scope) - existing_by_name = {} - to_send = [] - changed = False - for cmd in existing_cmds: - existing_by_name[cmd["name"]] = model.CommandData(**cmd) - - if len(new_cmds) != len(existing_cmds): - changed = True - - for command in new_cmds: - cmd_name = command["name"] - permissions[cmd_name] = command.pop("permissions") - if cmd_name in existing_by_name: - cmd_data = model.CommandData(**command) - existing_cmd = existing_by_name[cmd_name] - if cmd_data != existing_cmd: - changed = True - to_send.append(command) - else: - command_with_id = command - command_with_id["id"] = existing_cmd.id - to_send.append(command_with_id) - else: - changed = True - to_send.append(command) - - if changed: - self.logger.debug( - f"Detected changes on {scope if scope is not None else 'global'}, updating them" - ) - try: - existing_cmds = await self.req.put_slash_commands( - slash_commands=to_send, guild_id=scope - ) - except discord.HTTPException as ex: - if ex.status == 400: - # catch bad requests - cmd_nums = set( - re.findall(r"^[\w-]{1,32}$", ex.args[0]) - ) # find all discords references to commands - error_string = ex.args[0] - - for num in cmd_nums: - error_command = to_send[int(num)] - error_string = error_string.replace( - f"In {num}", - f"'{error_command.get('name')}'", - ) - - ex.args = (error_string,) - - raise ex - else: - self.logger.debug( - f"Detected no changes on {scope if scope is not None else 'global'}, skipping" - ) - - id_name_map = {} - for cmd in existing_cmds: - id_name_map[cmd["name"]] = cmd["id"] - - for cmd_name in permissions: - cmd_permissions = permissions[cmd_name] - cmd_id = id_name_map[cmd_name] - for applicable_guild in cmd_permissions: - if applicable_guild not in permissions_map: - permissions_map[applicable_guild] = [] - permission = { - "id": cmd_id, - "guild_id": applicable_guild, - "permissions": cmd_permissions[applicable_guild], - } - permissions_map[applicable_guild].append(permission) - - self.logger.info("Syncing permissions...") - self.logger.debug(f"Commands permission data are {permissions_map}") - for scope in permissions_map: - existing_perms = await self.req.get_all_guild_commands_permissions(scope) - new_perms = permissions_map[scope] - - changed = False - if len(existing_perms) != len(new_perms): - changed = True - else: - existing_perms_model = {} - for existing_perm in existing_perms: - existing_perms_model[existing_perm["id"]] = model.GuildPermissionsData( - **existing_perm - ) - for new_perm in new_perms: - if new_perm["id"] not in existing_perms_model: - changed = True - break - if existing_perms_model[new_perm["id"]] != model.GuildPermissionsData( - **new_perm - ): - changed = True - break - - if changed: - self.logger.debug(f"Detected permissions changes on {scope}, updating them") - await self.req.update_guild_commands_permissions(scope, new_perms) - else: - self.logger.debug(f"Detected no permissions changes on {scope}, skipping") - - if delete_from_unused_guilds: - self.logger.info("Deleting unused guild commands...") - other_guilds = [ - guild.id for guild in self._discord.guilds if guild.id not in cmds["guild"] - ] - # This is an extremly bad way to do this, because slash cmds can be in guilds the bot isn't in - # But it's the only way until discord makes an endpoint to request all the guild with cmds registered. - - for guild in other_guilds: - with suppress(discord.Forbidden): - existing = await self.req.get_all_commands(guild_id=guild) - if len(existing) != 0: - self.logger.debug(f"Deleting commands from {guild}") - await self.req.put_slash_commands(slash_commands=[], guild_id=guild) - - if delete_perms_from_unused_guilds: - self.logger.info("Deleting unused guild permissions...") - other_guilds = [ - guild.id for guild in self._discord.guilds if guild.id not in permissions_map.keys() - ] - for guild in other_guilds: - with suppress(discord.Forbidden): - self.logger.debug(f"Deleting permissions from {guild}") - existing_perms = await self.req.get_all_guild_commands_permissions(guild) - if len(existing_perms) != 0: - await self.req.update_guild_commands_permissions(guild, []) - - self.logger.info("Completed syncing all commands!") - - def add_slash_command( - self, - cmd, - name: str = None, - description: str = None, - guild_ids: typing.List[int] = None, - options: list = None, - default_permission: bool = True, - permissions: typing.Dict[int, list] = None, - connector: dict = None, - has_subcommands: bool = False, - ): - """ - Registers slash command to SlashCommand. - - .. warning:: - Just using this won't register slash command to Discord API. - To register it, check :meth:`.utils.manage_commands.add_slash_command` or simply enable `sync_commands`. - - :param cmd: Command Coroutine. - :type cmd: Coroutine - :param name: Name of the slash command. Default name of the coroutine. - :type name: str - :param description: Description of the slash command. Defaults to command docstring or ``None``. - :type description: str - :param guild_ids: List of Guild ID of where the command will be used. Default ``None``, which will be global command. - :type guild_ids: List[int] - :param options: Options of the slash command. This will affect ``auto_convert`` and command data at Discord API. Default ``None``. - :type options: list - :param default_permission: Sets if users have permission to run slash command by default, when no permissions are set. Default ``True``. - :type default_permission: bool - :param permissions: Dictionary of permissions of the slash command. Key being target guild_id and value being a list of permissions to apply. Default ``None``. - :type permissions: dict - :param connector: Kwargs connector for the command. Default ``None``. - :type connector: dict - :param has_subcommands: Whether it has subcommand. Default ``False``. - :type has_subcommands: bool - """ - name = name or cmd.__name__ - name = name.lower() - guild_ids = guild_ids if guild_ids else [] - if not all(isinstance(item, int) for item in guild_ids): - raise error.IncorrectGuildIDType( - f"The snowflake IDs {guild_ids} given are not a list of integers. Because of discord.py convention, please use integer IDs instead. Furthermore, the command '{name}' will be deactivated and broken until fixed." - ) - if name in self.commands: - tgt = self.commands[name] - if not tgt.has_subcommands: - raise error.DuplicateCommand(name) - has_subcommands = tgt.has_subcommands - for x in tgt.allowed_guild_ids: - if x not in guild_ids: - guild_ids.append(x) - - description = description or getdoc(cmd) - - if options is None: - options = manage_commands.generate_options(cmd, description, connector) - - _cmd = { - "func": cmd, - "description": description, - "guild_ids": guild_ids, - "api_options": options, - "default_permission": default_permission, - "api_permissions": permissions, - "connector": connector or {}, - "has_subcommands": has_subcommands, - } - obj = model.BaseCommandObject(name, _cmd) - self.commands[name] = obj - self.logger.debug(f"Added command `{name}`") - return obj - - def _cog_ext_add_context_menu(self, target: int, name: str, guild_ids: list = None): - """ - Creates a new cog_based context menu command. - - :param cmd: Command Coroutine. - :type cmd: Coroutine - :param name: The name of the command - :type name: str - :param _type: The context menu type. - :type _type: int - """ - - def add_context_menu(self, cmd, name: str, _type: int, guild_ids: list = None): - """ - Creates a new context menu command. - - :param cmd: Command Coroutine. - :type cmd: Coroutine - :param name: The name of the command - :type name: str - :param _type: The context menu type. - :type _type: int - """ - - name = [name or cmd.__name__][0] - guild_ids = guild_ids or [] - - if not all(isinstance(item, int) for item in guild_ids): - raise error.IncorrectGuildIDType( - f"The snowflake IDs {guild_ids} given are not a list of integers. Because of discord.py convention, please use integer IDs instead. Furthermore, the command '{name}' will be deactivated and broken until fixed." - ) - - if name in self.commands["context"]: - tgt = self.commands["context"][name] - if not tgt.has_subcommands: - raise error.DuplicateCommand(name) - has_subcommands = tgt.has_subcommands # noqa - for x in tgt.allowed_guild_ids: - if x not in guild_ids: - guild_ids.append(x) - - _cmd = { - "default_permission": None, - "has_permissions": None, - "name": name, - "type": _type, - "func": cmd, - "description": "", - "guild_ids": guild_ids, - "api_options": [], - "connector": {}, - "has_subcommands": False, - "api_permissions": {}, - } - - obj = model.BaseCommandObject(name, cmd=_cmd, _type=_type) - self.commands["context"][name] = obj - self.logger.debug(f"Added context command `{name}`") - return obj - - def add_subcommand( - self, - cmd, - base, - subcommand_group=None, - name=None, - description: str = None, - base_description: str = None, - base_default_permission: bool = True, - base_permissions: typing.Dict[int, list] = None, - subcommand_group_description: str = None, - guild_ids: typing.List[int] = None, - options: list = None, - connector: dict = None, - ): - """ - Registers subcommand to SlashCommand. - - :param cmd: Subcommand Coroutine. - :type cmd: Coroutine - :param base: Name of the base command. - :type base: str - :param subcommand_group: Name of the subcommand group, if any. Default ``None`` which represents there is no sub group. - :type subcommand_group: str - :param name: Name of the subcommand. Default name of the coroutine. - :type name: str - :param description: Description of the subcommand. Defaults to command docstring or ``None``. - :type description: str - :param base_description: Description of the base command. Default ``None``. - :type base_description: str - :param base_default_permission: Sets if users have permission to run base command by default, when no permissions are set. Default ``True``. - :type base_default_permission: bool - :param base_permissions: Dictionary of permissions of the slash command. Key being target guild_id and value being a list of permissions to apply. Default ``None``. - :type base_permissions: dict - :param subcommand_group_description: Description of the subcommand_group. Default ``None``. - :type subcommand_group_description: str - :param guild_ids: List of guild ID of where the command will be used. Default ``None``, which will be global command. - :type guild_ids: List[int] - :param options: Options of the subcommand. This will affect ``auto_convert`` and command data at Discord API. Default ``None``. - :type options: list - :param connector: Kwargs connector for the command. Default ``None``. - :type connector: dict - """ - base = base.lower() - subcommand_group = subcommand_group.lower() if subcommand_group else subcommand_group - name = name or cmd.__name__ - name = name.lower() - description = description or getdoc(cmd) - guild_ids = guild_ids if guild_ids else [] - if not all(isinstance(item, int) for item in guild_ids): - raise error.IncorrectGuildIDType( - f"The snowflake IDs {guild_ids} given are not a list of integers. Because of discord.py convention, please use integer IDs instead. Furthermore, the command '{name}' will be deactivated and broken until fixed." - ) - - if base in self.commands: - for x in guild_ids: - if x not in self.commands[base].allowed_guild_ids: - self.commands[base].allowed_guild_ids.append(x) - - if options is None: - options = manage_commands.generate_options(cmd, description, connector) - - _cmd = { - "func": None, - "description": base_description, - "guild_ids": guild_ids.copy(), - "api_options": [], - "default_permission": base_default_permission, - "api_permissions": base_permissions, - "connector": {}, - "has_subcommands": True, - } - _sub = { - "func": cmd, - "name": name, - "description": description, - "base_desc": base_description, - "sub_group_desc": subcommand_group_description, - "guild_ids": guild_ids, - "api_options": options, - "connector": connector or {}, - } - if base not in self.commands: - self.commands[base] = model.BaseCommandObject(base, _cmd) - else: - base_command = self.commands[base] - base_command.has_subcommands = True - if base_permissions: - for applicable_guild in base_permissions: - if applicable_guild not in base_command.permissions: - base_command.permissions[applicable_guild] = [] - base_command.permissions[applicable_guild].extend( - base_permissions[applicable_guild] - ) - if base_command.description: - _cmd["description"] = base_command.description - if base not in self.subcommands: - self.subcommands[base] = {} - if subcommand_group: - if subcommand_group not in self.subcommands[base]: - self.subcommands[base][subcommand_group] = {} - if name in self.subcommands[base][subcommand_group]: - raise error.DuplicateCommand(f"{base} {subcommand_group} {name}") - obj = model.SubcommandObject(_sub, base, name, subcommand_group) - self.subcommands[base][subcommand_group][name] = obj - else: - if name in self.subcommands[base]: - raise error.DuplicateCommand(f"{base} {name}") - obj = model.SubcommandObject(_sub, base, name) - self.subcommands[base][name] = obj - self.logger.debug( - f"Added subcommand `{base} {subcommand_group or ''} {name or cmd.__name__}`" - ) - return obj - - def slash( - self, - *, - name: str = None, - description: str = None, - guild_ids: typing.List[int] = None, - options: typing.List[dict] = None, - default_permission: bool = True, - permissions: dict = None, - connector: dict = None, - ): - """ - Decorator that registers coroutine as a slash command.\n - All decorator args must be passed as keyword-only args.\n - 1 arg for command coroutine is required for ctx(:class:`.model.SlashContext`), - and if your slash command has some args, then those args are also required.\n - All args must be passed as keyword-args. - - .. note:: - If you don't pass `options` but has extra args, then it will automatically generate options. - However, it is not recommended to use it since descriptions will be "No Description." or the command's description. - - .. warning:: - Unlike discord.py's command, ``*args``, keyword-only args, converters, etc. are not supported or behave differently. - - Example: - - .. code-block:: python - - @slash.slash(name="ping") - async def _slash(ctx): # Normal usage. - await ctx.send(content=f"Pong! (`{round(bot.latency*1000)}`ms)") - - - @slash.slash(name="pick") - async def _pick(ctx, choice1, choice2): # Command with 1 or more args. - await ctx.send(content=str(random.choice([choice1, choice2]))) - - To format the connector, follow this example. - - .. code-block:: python - - { - "example-arg": "example_arg", - "시간": "hour" - # Formatting connector is required for - # using other than english for option parameter name - # for in case. - } - - Set discord UI's parameter name as key, and set command coroutine's arg name as value. - - :param name: Name of the slash command. Default name of the coroutine. - :type name: str - :param description: Description of the slash command. Default ``None``. - :type description: str - :param guild_ids: List of Guild ID of where the command will be used. Default ``None``, which will be global command. - :type guild_ids: List[int] - :param options: Options of the slash command. This will affect ``auto_convert`` and command data at Discord API. Default ``None``. - :type options: List[dict] - :param default_permission: Sets if users have permission to run slash command by default, when no permissions are set. Default ``True``. - :type default_permission: bool - :param permissions: Permission requirements of the slash command. Default ``None``. - :type permissions: dict - :param connector: Kwargs connector for the command. Default ``None``. - :type connector: dict - """ - if not permissions: - permissions = {} - - def wrapper(cmd): - decorator_permissions = getattr(cmd, "__permissions__", None) - if decorator_permissions: - permissions.update(decorator_permissions) - - obj = self.add_slash_command( - cmd, - name, - description, - guild_ids, - options, - default_permission, - permissions, - connector, - ) - - return obj - - return wrapper - - def subcommand( - self, - *, - base, - subcommand_group=None, - name=None, - description: str = None, - base_description: str = None, - base_desc: str = None, - base_default_permission: bool = True, - base_permissions: dict = None, - subcommand_group_description: str = None, - sub_group_desc: str = None, - guild_ids: typing.List[int] = None, - options: typing.List[dict] = None, - connector: dict = None, - ): - """ - Decorator that registers subcommand.\n - Unlike discord.py, you don't need base command.\n - All args must be passed as keyword-args. - - .. note:: - If you don't pass `options` but has extra args, then it will automatically generate options. - However, it is not recommended to use it since descriptions will be "No Description." or the command's description. - - .. warning:: - Unlike discord.py's command, ``*args``, keyword-only args, converters, etc. are not supported or behave differently. - - Example: - - .. code-block:: python - - # /group say - @slash.subcommand(base="group", name="say") - async def _group_say(ctx, _str): - await ctx.send(content=_str) - - # /group kick user - @slash.subcommand(base="group", - subcommand_group="kick", - name="user") - async def _group_kick_user(ctx, user): - ... - - :param base: Name of the base command. - :type base: str - :param subcommand_group: Name of the subcommand group, if any. Default ``None`` which represents there is no sub group. - :type subcommand_group: str - :param name: Name of the subcommand. Default name of the coroutine. - :type name: str - :param description: Description of the subcommand. Default ``None``. - :type description: str - :param base_description: Description of the base command. Default ``None``. - :type base_description: str - :param base_desc: Alias of ``base_description``. - :param base_default_permission: Sets if users have permission to run slash command by default, when no permissions are set. Default ``True``. - :type base_default_permission: bool - :param permissions: Permission requirements of the slash command. Default ``None``. - :type permissions: dict - :param subcommand_group_description: Description of the subcommand_group. Default ``None``. - :type subcommand_group_description: str - :param sub_group_desc: Alias of ``subcommand_group_description``. - :param guild_ids: List of guild ID of where the command will be used. Default ``None``, which will be global command. - :type guild_ids: List[int] - :param options: Options of the subcommand. This will affect ``auto_convert`` and command data at Discord API. Default ``None``. - :type options: List[dict] - :param connector: Kwargs connector for the command. Default ``None``. - :type connector: dict - """ - base_description = base_description or base_desc - subcommand_group_description = subcommand_group_description or sub_group_desc - if not base_permissions: - base_permissions = {} - - def wrapper(cmd): - decorator_permissions = getattr(cmd, "__permissions__", None) - if decorator_permissions: - base_permissions.update(decorator_permissions) - - obj = self.add_subcommand( - cmd, - base, - subcommand_group, - name, - description, - base_description, - base_default_permission, - base_permissions, - subcommand_group_description, - guild_ids, - options, - connector, - ) - - return obj - - return wrapper - - def permission(self, guild_id: int, permissions: list): - """ - Decorator that add permissions. This will set the permissions for a single guild, you can use it more than once for each command. - - :param guild_id: ID of the guild for the permissions. - :type guild_id: int - :param permissions: List of permissions to be set for the specified guild. - :type permissions: list - """ - - def wrapper(cmd): - if not getattr(cmd, "__permissions__", None): - cmd.__permissions__ = {} - cmd.__permissions__[guild_id] = permissions - return cmd - - return wrapper - - def context_menu(self, *, target: int, name: str, guild_ids: list = None): - """ - Decorator that adds context menu commands. - - :param target: The type of menu. - :type target: int - :param name: A name to register as the command in the menu. - :type name: str - :param guild_ids: A list of guild IDs to register the command under. Defaults to ``None``. - :type guild_ids: list - """ - - def wrapper(cmd): - # _obj = self.add_slash_command( - # cmd, - # name, - # "", - # guild_ids - # ) - - # This has to call both, as its a arg-less menu. - - obj = self.add_context_menu(cmd, name, target, guild_ids) - - return obj - - return wrapper - - def add_component_callback( - self, - callback: typing.Coroutine, - *, - messages: typing.Union[int, discord.Message, list] = None, - components: typing.Union[str, dict, list] = None, - use_callback_name=True, - component_type: int = None, - ): - """ - Adds a coroutine callback to a component. - Callback can be made to only accept component interactions from a specific messages - and/or custom_ids of components. - - :param Coroutine callback: The coroutine to be called when the component is interacted with. Must accept a single argument with the type :class:`.context.ComponentContext`. - :param messages: If specified, only interactions from the message given will be accepted. Can be a message object to check for, or the message ID or list of previous two. Empty list will mean that no interactions are accepted. - :type messages: Union[discord.Message, int, list] - :param components: If specified, only interactions with ``custom_id`` of given components will be accepted. Defaults to the name of ``callback`` if ``use_callback_name=True``. Can be a custom ID (str) or component dict (actionrow or button) or list of previous two. - :type components: Union[str, dict, list] - :param use_callback_name: Whether the ``custom_id`` defaults to the name of ``callback`` if unspecified. If ``False``, either ``messages`` or ``components`` must be specified. - :type use_callback_name: bool - :param component_type: The type of the component to avoid collisions with other component types. See :class:`.model.ComponentType`. - :type component_type: Optional[int] - :raises: .error.DuplicateCustomID, .error.IncorrectFormat - """ - - message_ids = list(get_messages_ids(messages)) if messages is not None else [None] - custom_ids = list(get_components_ids(components)) if components is not None else [None] - - if use_callback_name and custom_ids == [None]: - custom_ids = [callback.__name__] - - if message_ids == [None] and custom_ids == [None]: - raise error.IncorrectFormat("You must specify messages or components (or both)") - - callback_obj = model.ComponentCallbackObject( - callback, message_ids, custom_ids, component_type - ) - self._add_comp_callback_obj(callback_obj) - return callback_obj - - def _add_comp_callback_obj(self, callback_obj): - component_type = callback_obj.component_type - - for message_id, custom_id in callback_obj.keys: - self._register_comp_callback_obj(callback_obj, message_id, custom_id, component_type) - - def _register_comp_callback_obj(self, callback_obj, message_id, custom_id, component_type): - message_id_dict = self.components - custom_id_dict = message_id_dict.setdefault(message_id, {}) - component_type_dict = custom_id_dict.setdefault(custom_id, {}) - - if component_type in component_type_dict: - raise error.DuplicateCallback(message_id, custom_id, component_type) - - component_type_dict[component_type] = callback_obj - self.logger.debug( - f"Added component callback for " - f"message ID {message_id or ''}, " - f"custom_id `{custom_id or ''}`, " - f"component_type `{component_type or ''}`" - ) - - def extend_component_callback( - self, - callback_obj: model.ComponentCallbackObject, - message_id: int = None, - custom_id: str = None, - ): - """ - Registers existing callback object (:class:`.model.ComponentCallbackObject`) - for specific combination of message_id, custom_id, component_type. - - :param callback_obj: callback object. - :type callback_obj: model.ComponentCallbackObject - :param message_id: If specified, only removes the callback for the specific message ID. - :type message_id: Optional[.model] - :param custom_id: The ``custom_id`` of the component. - :type custom_id: Optional[str] - :raises: .error.DuplicateCustomID, .error.IncorrectFormat - """ - - component_type = callback_obj.component_type - self._register_comp_callback_obj(callback_obj, message_id, custom_id, component_type) - callback_obj.keys.add((message_id, custom_id)) - - def get_component_callback( - self, - message_id: int = None, - custom_id: str = None, - component_type: int = None, - ): - """ - Returns component callback (or None if not found) for specific combination of message_id, custom_id, component_type. - - :param message_id: If specified, only removes the callback for the specific message ID. - :type message_id: Optional[.model] - :param custom_id: The ``custom_id`` of the component. - :type custom_id: Optional[str] - :param component_type: The type of the component. See :class:`.model.ComponentType`. - :type component_type: Optional[int] - - :return: Optional[model.ComponentCallbackObject] - """ - message_id_dict = self.components - try: - custom_id_dict = _get_val(message_id_dict, message_id) - component_type_dict = _get_val(custom_id_dict, custom_id) - callback = _get_val(component_type_dict, component_type) - - except KeyError: # there was no key in dict and no global fallback - pass - else: - return callback - - def remove_component_callback( - self, message_id: int = None, custom_id: str = None, component_type: int = None - ): - """ - Removes a component callback from specific combination of message_id, custom_id, component_type. - - :param message_id: If specified, only removes the callback for the specific message ID. - :type message_id: Optional[int] - :param custom_id: The ``custom_id`` of the component. - :type custom_id: Optional[str] - :param component_type: The type of the component. See :class:`.model.ComponentType`. - :type component_type: Optional[int] - :raises: .error.IncorrectFormat - """ - try: - callback = self.components[message_id][custom_id].pop(component_type) - if not self.components[message_id][custom_id]: # delete dict nesting levels if empty - self.components[message_id].pop(custom_id) - if not self.components[message_id]: - self.components.pop(message_id) - except KeyError: - raise error.IncorrectFormat( - f"Callback for " - f"message ID `{message_id or ''}`, " - f"custom_id `{custom_id or ''}`, " - f"component_type `{component_type or ''}` is not registered!" - ) - else: - callback.keys.remove((message_id, custom_id)) - - def remove_component_callback_obj(self, callback_obj: model.ComponentCallbackObject): - """ - Removes a component callback from all related message_id, custom_id listeners. - - :param callback_obj: callback object. - :type callback_obj: model.ComponentCallbackObject - :raises: .error.IncorrectFormat - """ - if not callback_obj.keys: - raise error.IncorrectFormat("Callback already removed from any listeners") - - component_type = callback_obj.component_type - for message_id, custom_id in callback_obj.keys.copy(): - self.remove_component_callback(message_id, custom_id, component_type) - - def component_callback( - self, - *, - messages: typing.Union[int, discord.Message, list] = None, - components: typing.Union[str, dict, list] = None, - use_callback_name=True, - component_type: int = None, - ): - """ - Decorator that registers a coroutine as a component callback. - Adds a coroutine callback to a component. - Callback can be made to only accept component interactions from a specific messages - and/or custom_ids of components. - - :param messages: If specified, only interactions from the message given will be accepted. Can be a message object to check for, or the message ID or list of previous two. Empty list will mean that no interactions are accepted. - :type messages: Union[discord.Message, int, list] - :param components: If specified, only interactions with ``custom_id`` of given components will be accepted. Defaults to the name of ``callback`` if ``use_callback_name=True``. Can be a custom ID (str) or component dict (actionrow or button) or list of previous two. - :type components: Union[str, dict, list] - :param use_callback_name: Whether the ``custom_id`` defaults to the name of ``callback`` if unspecified. If ``False``, either ``messages`` or ``components`` must be specified. - :type use_callback_name: bool - :param component_type: The type of the component to avoid collisions with other component types. See :class:`.model.ComponentType`. - :type component_type: Optional[int] - :raises: .error.DuplicateCustomID, .error.IncorrectFormat - """ - - def wrapper(callback): - return self.add_component_callback( - callback, - messages=messages, - components=components, - use_callback_name=use_callback_name, - component_type=component_type, - ) - - return wrapper - - async def process_options( - self, - guild: discord.Guild, - options: list, - connector: dict, - temporary_auto_convert: dict = None, - ) -> dict: - """ - Processes Role, User, and Channel option types to discord.py's models. - - :param guild: Guild of the command message. - :type guild: discord.Guild - :param options: Dict of options. - :type options: list - :param connector: Kwarg connector. - :param temporary_auto_convert: Temporary parameter, use this if options doesn't have ``type`` keyword. - :return: Union[list, dict] - """ - - if not guild or not isinstance(guild, discord.Guild): - return {connector.get(x["name"]) or x["name"]: x["value"] for x in options} - - converters = [ - # If extra converters are added and some needs to fetch it, - # you should pass as a list with 1st item as a cache get method - # and 2nd as a actual fetching method. - [guild.get_member, guild.fetch_member], - guild.get_channel, - guild.get_role, - ] - - types = { - "user": 0, - "USER": 0, - model.SlashCommandOptionType.USER: 0, - "6": 0, - 6: 0, - "channel": 1, - "CHANNEL": 1, - model.SlashCommandOptionType.CHANNEL: 1, - "7": 1, - 7: 1, - "role": 2, - "ROLE": 2, - model.SlashCommandOptionType.ROLE: 2, - 8: 2, - "8": 2, - } - - to_return = {} - - for x in options: - processed = None # This isn't the best way, but we should to reduce duplicate lines. - - # This is to temporarily fix Issue #97, that on Android device - # does not give option type from API. - if "type" not in x: - x["type"] = temporary_auto_convert[x["name"]] - - if x["type"] not in types: - processed = x["value"] - else: - loaded_converter = converters[types[x["type"]]] - if isinstance(loaded_converter, list): # For user type. - cache_first = loaded_converter[0](int(x["value"])) - if cache_first: - processed = cache_first - else: - loaded_converter = loaded_converter[1] - if not processed: - try: - processed = ( - await loaded_converter(int(x["value"])) - if iscoroutinefunction(loaded_converter) - else loaded_converter(int(x["value"])) - ) - except ( - discord.Forbidden, - discord.HTTPException, - discord.NotFound, - ): # Just in case. - self.logger.warning("Failed fetching discord object! Passing ID instead.") - processed = int(x["value"]) - to_return[connector.get(x["name"]) or x["name"]] = processed - return to_return - - async def invoke_command(self, func, ctx, args): - """ - Invokes command. - - :param func: Command coroutine. - :param ctx: Context. - :param args: Args. Can be list or dict. - """ - try: - if isinstance(args, dict): - ctx.kwargs = args - ctx.args = list(args.values()) - await func.invoke(ctx, **args) - except Exception as ex: - if not await self._handle_invoke_error(func, ctx, ex): - await self.on_slash_command_error(ctx, ex) - - async def invoke_component_callback(self, func, ctx): - """ - Invokes component callback. - - :param func: Component callback object. - :param ctx: Context. - """ - try: - await func.invoke(ctx) - except Exception as ex: - if not await self._handle_invoke_error(func, ctx, ex): - await self.on_component_callback_error(ctx, ex) - - async def _handle_invoke_error(self, func, ctx, ex): - if hasattr(func, "on_error"): - if func.on_error is not None: - try: - if hasattr(func, "cog"): - await func.on_error(func.cog, ctx, ex) - else: - await func.on_error(ctx, ex) - return True - except Exception as e: - self.logger.error(f"{ctx.command}:: Error using error decorator: {e}") - return False - - async def on_socket_response(self, msg): - """ - This event listener is automatically registered at initialization of this class. - - .. warning:: - DO NOT MANUALLY REGISTER, OVERRIDE, OR WHATEVER ACTION TO THIS COROUTINE UNLESS YOU KNOW WHAT YOU ARE DOING. - - :param msg: Gateway message. - """ - if msg["t"] != "INTERACTION_CREATE": - return - - to_use = msg["d"] - interaction_type = to_use["type"] - - # it's time to start using the new module guys, - # sorry. - warnings.warn( - message='This pip module is now deprecated as of version 3.0.1a! Please use the new module "discord-py-interactions" to gain access to 3.0.2 and future versions.', - category=DeprecationWarning, - ) - - # dis_snek variance seq - - if interaction_type in (1, 2): - await self._on_slash(to_use) - await self._on_context_menu(to_use) - elif interaction_type == 3: - try: - await self._on_component(to_use) # noqa - except KeyError: - pass # for some reason it complains about custom_id being an optional arg when it's fine? - finally: - await self._on_context_menu(to_use) - else: - raise NotImplementedError( - f"Unknown Interaction Received: {interaction_type}" - ) # check if discord does a sneaky event change on us - return - - async def _on_component(self, to_use): - ctx = context.ComponentContext(self.req, to_use, self._discord, self.logger) - self._discord.dispatch("component", ctx) - - callback = self.get_component_callback( - ctx.origin_message_id, ctx.custom_id, ctx.component_type - ) - if callback is not None: - self._discord.dispatch("component_callback", ctx, callback) - await self.invoke_component_callback(callback, ctx) - - async def _on_slash(self, to_use): # slash commands only. - if to_use["data"]["name"] in self.commands: - - ctx = context.SlashContext(self.req, to_use, self._discord, self.logger) - cmd_name = to_use["data"]["name"] - - if cmd_name not in self.commands and cmd_name in self.subcommands: - return await self.handle_subcommand(ctx, to_use) - - selected_cmd = self.commands[to_use["data"]["name"]] - - if type(selected_cmd) == dict: - return # this is context dict storage. - - if selected_cmd._type != 1: - return # If its a menu, ignore. - - if ( - selected_cmd.allowed_guild_ids - and ctx.guild_id not in selected_cmd.allowed_guild_ids - ): - return - - if selected_cmd.has_subcommands and not selected_cmd.func: - return await self.handle_subcommand(ctx, to_use) - - if "options" in to_use["data"]: - for x in to_use["data"]["options"]: - if "value" not in x: - return await self.handle_subcommand(ctx, to_use) - - # This is to temporarily fix Issue #97, that on Android device - # does not give option type from API. - temporary_auto_convert = {} - for x in selected_cmd.options: - temporary_auto_convert[x["name"].lower()] = x["type"] - - args = ( - await self.process_options( - ctx.guild, - to_use["data"]["options"], - selected_cmd.connector, - temporary_auto_convert, - ) - if "options" in to_use["data"] - else {} - ) - - self._discord.dispatch("slash_command", ctx) - - await self.invoke_command(selected_cmd, ctx, args) - - async def _on_context_menu(self, to_use): - # Slash Command Logic - - # to prevent any potential keyerrors: - if "name" not in to_use["data"].keys(): - return - - if to_use["data"]["name"] in self.commands["context"]: - ctx = context.MenuContext(self.req, to_use, self._discord, self.logger) - cmd_name = to_use["data"]["name"] - - if cmd_name not in self.commands["context"] and cmd_name in self.subcommands: - return # menus don't have subcommands you smooth brain - - selected_cmd = self.commands["context"][cmd_name] - - if ( - selected_cmd.allowed_guild_ids - and ctx.guild_id not in selected_cmd.allowed_guild_ids - ): - return - - if selected_cmd.has_subcommands and not selected_cmd.func: - return await self.handle_subcommand(ctx, to_use) - - if "options" in to_use["data"]: - for x in to_use["data"]["options"]: - if "value" not in x: - return await self.handle_subcommand(ctx, to_use) - - self._discord.dispatch("context_menu", ctx) - - await self.invoke_command(selected_cmd, ctx, args={}) - - # Cog Logic - - elif to_use["data"]["name"] in self.commands: - ctx = context.MenuContext(self.req, to_use, self._discord, self.logger) - cmd_name = to_use["data"]["name"] - - if cmd_name not in self.commands and cmd_name in self.subcommands: - return # menus don't have subcommands you smooth brain - - selected_cmd = self.commands[cmd_name] - if type(selected_cmd) == dict: - return # Get rid of any selection thats a dict somehow - if selected_cmd._type == 1: # noqa - return # Slash command obj. - - if ( - selected_cmd.allowed_guild_ids - and ctx.guild_id not in selected_cmd.allowed_guild_ids - ): - return - - if selected_cmd.has_subcommands and not selected_cmd.func: - return await self.handle_subcommand(ctx, to_use) - - if "options" in to_use["data"]: - for x in to_use["data"]["options"]: - if "value" not in x: - return await self.handle_subcommand(ctx, to_use) - - self._discord.dispatch("context_menu", ctx) - - await self.invoke_command(selected_cmd, ctx, args={}) - - async def handle_subcommand(self, ctx: context.SlashContext, data: dict): - """ - Coroutine for handling subcommand. - - .. warning:: - Do not manually call this. - - :param ctx: :class:`.model.SlashContext` instance. - :param data: Gateway message. - """ - if data["data"]["name"] not in self.subcommands: - return - base = self.subcommands[data["data"]["name"]] - sub = data["data"]["options"][0] - sub_name = sub["name"] - if sub_name not in base: - return - ctx.subcommand_name = sub_name - sub_opts = sub["options"] if "options" in sub else [] - for x in sub_opts: - if "options" in x or "value" not in x: - sub_group = x["name"] - if sub_group not in base[sub_name]: - return - ctx.subcommand_group = sub_group - selected = base[sub_name][sub_group] - - # This is to temporarily fix Issue #97, that on Android device - # does not give option type from API. - temporary_auto_convert = {} - for n in selected.options: - temporary_auto_convert[n["name"].lower()] = n["type"] - - args = ( - await self.process_options( - ctx.guild, x["options"], selected.connector, temporary_auto_convert - ) - if "options" in x - else {} - ) - self._discord.dispatch("slash_command", ctx) - await self.invoke_command(selected, ctx, args) - return - selected = base[sub_name] - - # This is to temporarily fix Issue #97, that on Android device - # does not give option type from API. - temporary_auto_convert = {} - for n in selected.options: - temporary_auto_convert[n["name"].lower()] = n["type"] - - args = ( - await self.process_options( - ctx.guild, sub_opts, selected.connector, temporary_auto_convert - ) - if "options" in sub - else {} - ) - self._discord.dispatch("slash_command", ctx) - await self.invoke_command(selected, ctx, args) - - def _on_error(self, ctx, ex, event_name): - on_event = "on_" + event_name - if self.has_listener: - if self._discord.extra_events.get(on_event): - self._discord.dispatch(event_name, ctx, ex) - return True - if hasattr(self._discord, on_event): - self._discord.dispatch(event_name, ctx, ex) - return True - return False - - async def on_slash_command_error(self, ctx, ex): - """ - Handles Exception occurred from invoking command. - - Example of adding event: - - .. code-block:: python - - @client.event - async def on_slash_command_error(ctx, ex): - ... - - Example of adding listener: - - .. code-block:: python - - @bot.listen() - async def on_slash_command_error(ctx, ex): - ... - - :param ctx: Context of the command. - :type ctx: :class:`.model.SlashContext` - :param ex: Exception from the command invoke. - :type ex: Exception - :return: - """ - if not self._on_error(ctx, ex, "slash_command_error"): - # Prints exception if not overridden or has no listener for error. - self.logger.exception( - f"An exception has occurred while executing command `{ctx.name}`:" - ) - - async def on_component_callback_error(self, ctx, ex): - """ - Handles Exception occurred from invoking component callback. - - Example of adding event: - - .. code-block:: python - - @client.event - async def on_component_callback_error(ctx, ex): - ... - - Example of adding listener: - - .. code-block:: python - - @bot.listen() - async def on_component_callback_error(ctx, ex): - ... - - :param ctx: Context of the callback. - :type ctx: :class:`.model.ComponentContext` - :param ex: Exception from the command invoke. - :type ex: Exception - :return: - """ - if not self._on_error(ctx, ex, "component_callback_error"): - # Prints exception if not overridden or has no listener for error. - self.logger.exception( - f"An exception has occurred while executing component callback custom ID `{ctx.custom_id}`:" - ) diff --git a/discord_slash/cog_ext.py b/discord_slash/cog_ext.py deleted file mode 100644 index b220845d..00000000 --- a/discord_slash/cog_ext.py +++ /dev/null @@ -1,291 +0,0 @@ -import inspect -import typing - -import discord - -from .error import IncorrectFormat, IncorrectGuildIDType -from .model import CogBaseCommandObject, CogComponentCallbackObject, CogSubcommandObject -from .utils import manage_commands -from .utils.manage_components import get_components_ids, get_messages_ids - - -def cog_slash( - *, - name: str = None, - description: str = None, - guild_ids: typing.List[int] = None, - options: typing.List[dict] = None, - default_permission: bool = True, - permissions: typing.Dict[int, list] = None, - connector: dict = None, -): - """ - Decorator for Cog to add slash command.\n - Almost same as :meth:`.client.SlashCommand.slash`. - - Example: - - .. code-block:: python - - class ExampleCog(commands.Cog): - def __init__(self, bot): - self.bot = bot - - @cog_ext.cog_slash(name="ping") - async def ping(self, ctx: SlashContext): - await ctx.send(content="Pong!") - - :param name: Name of the slash command. Default name of the coroutine. - :type name: str - :param description: Description of the slash command. Default ``None``. - :type description: str - :param guild_ids: List of Guild ID of where the command will be used. Default ``None``, which will be global command. - :type guild_ids: List[int] - :param options: Options of the slash command. This will affect ``auto_convert`` and command data at Discord API. Default ``None``. - :type options: List[dict] - :param default_permission: Sets if users have permission to run slash command by default, when no permissions are set. Default ``True``. - :type default_permission: bool - :param permissions: Dictionary of permissions of the slash command. Key being target guild_id and value being a list of permissions to apply. Default ``None``. - :type permissions: dict - :param connector: Kwargs connector for the command. Default ``None``. - :type connector: dict - """ - if not permissions: - permissions = {} - - def wrapper(cmd): - decorator_permissions = getattr(cmd, "__permissions__", None) - if decorator_permissions: - permissions.update(decorator_permissions) - - desc = description or inspect.getdoc(cmd) - if options is None: - opts = manage_commands.generate_options(cmd, desc, connector) - else: - opts = options - - if guild_ids and not all(isinstance(item, int) for item in guild_ids): - raise IncorrectGuildIDType( - f"The snowflake IDs {guild_ids} given are not a list of integers. Because of discord.py convention, please use integer IDs instead. Furthermore, the command '{name or cmd.__name__}' will be deactivated and broken until fixed." - ) - - _cmd = { - "func": cmd, - "description": desc, - "guild_ids": guild_ids, - "api_options": opts, - "default_permission": default_permission, - "api_permissions": permissions, - "connector": connector, - "has_subcommands": False, - } - return CogBaseCommandObject(name or cmd.__name__, _cmd) - - return wrapper - - -def cog_subcommand( - *, - base, - subcommand_group=None, - name=None, - description: str = None, - base_description: str = None, - base_desc: str = None, - base_default_permission: bool = True, - base_permissions: typing.Dict[int, list] = None, - subcommand_group_description: str = None, - sub_group_desc: str = None, - guild_ids: typing.List[int] = None, - options: typing.List[dict] = None, - connector: dict = None, -): - """ - Decorator for Cog to add subcommand.\n - Almost same as :meth:`.client.SlashCommand.subcommand`. - - Example: - - .. code-block:: python - - class ExampleCog(commands.Cog): - def __init__(self, bot): - self.bot = bot - - @cog_ext.cog_subcommand(base="group", name="say") - async def group_say(self, ctx: SlashContext, text: str): - await ctx.send(content=text) - - :param base: Name of the base command. - :type base: str - :param subcommand_group: Name of the subcommand group, if any. Default ``None`` which represents there is no sub group. - :type subcommand_group: str - :param name: Name of the subcommand. Default name of the coroutine. - :type name: str - :param description: Description of the subcommand. Default ``None``. - :type description: str - :param base_description: Description of the base command. Default ``None``. - :type base_description: str - :param base_desc: Alias of ``base_description``. - :param base_default_permission: Sets if users have permission to run slash command by default, when no permissions are set. Default ``True``. - :type base_default_permission: bool - :param base_permissions: Dictionary of permissions of the slash command. Key being target guild_id and value being a list of permissions to apply. Default ``None``. - :type base_permissions: dict - :param subcommand_group_description: Description of the subcommand_group. Default ``None``. - :type subcommand_group_description: str - :param sub_group_desc: Alias of ``subcommand_group_description``. - :param guild_ids: List of guild ID of where the command will be used. Default ``None``, which will be global command. - :type guild_ids: List[int] - :param options: Options of the subcommand. This will affect ``auto_convert`` and command data at Discord API. Default ``None``. - :type options: List[dict] - :param connector: Kwargs connector for the command. Default ``None``. - :type connector: dict - """ - base_description = base_description or base_desc - subcommand_group_description = subcommand_group_description or sub_group_desc - guild_ids = guild_ids if guild_ids else [] - if not base_permissions: - base_permissions = {} - - def wrapper(cmd): - decorator_permissions = getattr(cmd, "__permissions__", None) - if decorator_permissions: - base_permissions.update(decorator_permissions) - - desc = description or inspect.getdoc(cmd) - if options is None: - opts = manage_commands.generate_options(cmd, desc, connector) - else: - opts = options - - if guild_ids and not all(isinstance(item, int) for item in guild_ids): - raise IncorrectGuildIDType( - f"The snowflake IDs {guild_ids} given are not a list of integers. Because of discord.py convention, please use integer IDs instead. Furthermore, the command '{name or cmd.__name__}' will be deactivated and broken until fixed." - ) - - _cmd = { - "func": None, - "description": base_description, - "guild_ids": guild_ids.copy(), - "api_options": [], - "default_permission": base_default_permission, - "api_permissions": base_permissions, - "connector": {}, - "has_subcommands": True, - } - - _sub = { - "func": cmd, - "name": name or cmd.__name__, - "description": desc, - "base_desc": base_description, - "sub_group_desc": subcommand_group_description, - "guild_ids": guild_ids, - "api_options": opts, - "connector": connector, - } - return CogSubcommandObject(base, _cmd, subcommand_group, name or cmd.__name__, _sub) - - return wrapper - - -# I don't feel comfortable with having these right now, they're too buggy even when they were working. - - -def cog_context_menu(*, name: str, guild_ids: list = None, target: int = 1): - """ - Decorator that adds context menu commands. - - :param target: The type of menu. - :type target: int - :param name: A name to register as the command in the menu. - :type name: str - :param guild_ids: A list of guild IDs to register the command under. Defaults to ``None``. - :type guild_ids: list - """ - - def wrapper(cmd): - if name == "context": - raise IncorrectFormat( - "The name 'context' can not be used to register as a cog context menu," - "as this conflicts with this lib's checks. Please use a different name instead." - ) - - _cmd = { - "default_permission": None, - "has_permissions": None, - "name": name, - "type": target, - "func": cmd, - "description": "", - "guild_ids": guild_ids, - "api_options": [], - "connector": {}, - "has_subcommands": False, - "api_permissions": {}, - } - return CogBaseCommandObject(name or cmd.__name__, _cmd, target) - - return wrapper - - -def permission(guild_id: int, permissions: list): - """ - Decorator that add permissions. This will set the permissions for a single guild, you can use it more than once for each command. - - :param guild_id: ID of the guild for the permissions. - :type guild_id: int - :param permissions: List of permissions to be set for the specified guild. - :type permissions: list - """ - - def wrapper(cmd): - if not getattr(cmd, "__permissions__", None): - cmd.__permissions__ = {} - cmd.__permissions__[guild_id] = permissions - return cmd - - return wrapper - - -def cog_component( - *, - messages: typing.Union[int, discord.Message, list] = None, - components: typing.Union[str, dict, list] = None, - use_callback_name=True, - component_type: int = None, -): - """ - Decorator for component callbacks in cogs.\n - Almost same as :meth:`.client.SlashCommand.component_callback`. - - :param messages: If specified, only interactions from the message given will be accepted. Can be a message object to check for, or the message ID or list of previous two. Empty list will mean that no interactions are accepted. - :type messages: Union[discord.Message, int, list] - :param components: If specified, only interactions with ``custom_id`` of given components will be accepted. Defaults to the name of ``callback`` if ``use_callback_name=True``. Can be a custom ID (str) or component dict (actionrow or button) or list of previous two. - :type components: Union[str, dict, list] - :param use_callback_name: Whether the ``custom_id`` defaults to the name of ``callback`` if unspecified. If ``False``, either ``messages`` or ``components`` must be specified. - :type use_callback_name: bool - :param component_type: The type of the component to avoid collisions with other component types. See :class:`.model.ComponentType`. - :type component_type: Optional[int] - :raises: .error.DuplicateCustomID, .error.IncorrectFormat - """ - message_ids = list(get_messages_ids(messages)) if messages is not None else [None] - custom_ids = list(get_components_ids(components)) if components is not None else [None] - - def wrapper(callback): - nonlocal custom_ids - - if use_callback_name and custom_ids == [None]: - custom_ids = [callback.__name__] - - if message_ids == [None] and custom_ids == [None]: - raise IncorrectFormat("You must specify messages or components (or both)") - - return CogComponentCallbackObject( - callback, - message_ids=message_ids, - custom_ids=custom_ids, - component_type=component_type, - ) - - return wrapper diff --git a/discord_slash/const.py b/discord_slash/const.py deleted file mode 100644 index 2f1a7f85..00000000 --- a/discord_slash/const.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Discord Slash Constants""" - -__version__ = "3.0.3" - -BASE_API = "https://discord.com/api/v8" diff --git a/discord_slash/context.py b/discord_slash/context.py deleted file mode 100644 index efa34f27..00000000 --- a/discord_slash/context.py +++ /dev/null @@ -1,776 +0,0 @@ -import datetime -import typing -from typing import TYPE_CHECKING -from warnings import warn - -import discord -from discord.ext import commands -from discord.utils import snowflake_time - -from . import error, http, model -from .dpy_overrides import ComponentMessage - -if TYPE_CHECKING: # circular import sucks for typehinting - from . import client - - -class InteractionContext: - """ - Base context for interactions.\n - In some ways similar with discord.ext.commands.Context. - - .. warning:: - Do not manually init this model. - - :ivar message: Message that invoked the slash command. - :ivar interaction_id: Interaction ID of the command message. - :ivar bot: discord.py client. - :ivar _http: :class:`.http.SlashCommandRequest` of the client. - :ivar _logger: Logger instance. - :ivar data: The raw data of the interaction. - :ivar values: The values sent with the interaction. Currently for selects. - :ivar deferred: Whether the command is current deferred (loading state) - :ivar _deferred_hidden: Internal var to check that state stays the same - :ivar responded: Whether you have responded with a message to the interaction. - :ivar guild_id: Guild ID of the command message. If the command was invoked in DM, then it is ``None`` - :ivar author_id: User ID representing author of the command message. - :ivar channel_id: Channel ID representing channel of the command message. - :ivar author: User or Member instance of the command invoke. - - """ - - def __init__( - self, - _http: http.SlashCommandRequest, - _json: dict, - _discord: typing.Union[discord.Client, commands.Bot], - logger, - ): - self._token = _json["token"] - self.message = None - self.menu_messages = None - self.data = _json["data"] - self.interaction_id = _json["id"] - self._http = _http - self.bot = _discord - self._logger = logger - self.deferred = False - self.responded = False - self.values = _json["data"]["values"] if "values" in _json["data"] else None - self._deferred_hidden = False # To check if the patch to the deferred response matches - self.guild_id = int(_json["guild_id"]) if "guild_id" in _json.keys() else None - self.author_id = int( - _json["member"]["user"]["id"] if "member" in _json.keys() else _json["user"]["id"] - ) - self.channel_id = int(_json["channel_id"]) - if self.guild: - self.author = discord.Member( - data=_json["member"], state=self.bot._connection, guild=self.guild - ) - elif self.guild_id: - self.author = discord.User(data=_json["member"]["user"], state=self.bot._connection) - else: - self.author = discord.User(data=_json["user"], state=self.bot._connection) - self.created_at: datetime.datetime = snowflake_time(int(self.interaction_id)) - - @property - def _deffered_hidden(self): - warn( - "`_deffered_hidden` as been renamed to `_deferred_hidden`.", - DeprecationWarning, - stacklevel=2, - ) - return self._deferred_hidden - - @_deffered_hidden.setter - def _deffered_hidden(self, value): - warn( - "`_deffered_hidden` as been renamed to `_deferred_hidden`.", - DeprecationWarning, - stacklevel=2, - ) - self._deferred_hidden = value - - @property - def deffered(self): - warn("`deffered` as been renamed to `deferred`.", DeprecationWarning, stacklevel=2) - return self.deferred - - @deffered.setter - def deffered(self, value): - warn("`deffered` as been renamed to `deferred`.", DeprecationWarning, stacklevel=2) - self.deferred = value - - @property - def guild(self) -> typing.Optional[discord.Guild]: - """ - Guild instance of the command invoke. If the command was invoked in DM, then it is ``None`` - - :return: Optional[discord.Guild] - """ - return self.bot.get_guild(self.guild_id) if self.guild_id else None - - @property - def channel(self) -> typing.Optional[typing.Union[discord.TextChannel, discord.DMChannel]]: - """ - Channel instance of the command invoke. - - :return: Optional[Union[discord.abc.GuildChannel, discord.abc.PrivateChannel]] - """ - return self.bot.get_channel(self.channel_id) - - @property - def voice_client(self) -> typing.Optional[discord.VoiceProtocol]: - """ - VoiceClient instance of the command invoke. If the command was invoked in DM, then it is ``None``. - If the bot is not connected to any Voice/Stage channels, then it is ``None``. - - :return: Optional[discord.VoiceProtocol] - """ - return self.guild.voice_client if self.guild else None - - @property - def me(self) -> typing.Union[discord.Member, discord.ClientUser]: - """ - Bot member instance of the command invoke. If the command was invoked in DM, then it is ``discord.ClientUser``. - - :return: Union[discord.Member, discord.ClientUser] - """ - return self.guild.me if self.guild is not None else self.bot.user - - async def defer(self, hidden: bool = False): - """ - 'Defers' the response, showing a loading state to the user - - :param hidden: Whether the deferred response should be ephemeral . Default ``False``. - """ - if self.deferred or self.responded: - raise error.AlreadyResponded("You have already responded to this command!") - base = {"type": 5} - if hidden: - base["data"] = {"flags": 64} - self._deferred_hidden = True - await self._http.post_initial_response(base, self.interaction_id, self._token) - self.deferred = True - - async def send( - self, - content: str = "", - *, - embed: discord.Embed = None, - embeds: typing.List[discord.Embed] = None, - tts: bool = False, - file: discord.File = None, - files: typing.List[discord.File] = None, - allowed_mentions: discord.AllowedMentions = None, - hidden: bool = False, - delete_after: float = None, - components: typing.List[dict] = None, - ) -> model.SlashMessage: - """ - Sends response of the interaction. - - .. warning:: - - Since Release 1.0.9, this is completely changed. If you are migrating from older version, please make sure to fix the usage. - - You can't use both ``embed`` and ``embeds`` at the same time, also applies to ``file`` and ``files``. - - If you send files in the initial response, this will defer if it's not been deferred, and then PATCH with the message - - :param content: Content of the response. - :type content: str - :param embed: Embed of the response. - :type embed: discord.Embed - :param embeds: Embeds of the response. Maximum 10. - :type embeds: List[discord.Embed] - :param tts: Whether to speak message using tts. Default ``False``. - :type tts: bool - :param file: File to send. - :type file: discord.File - :param files: Files to send. - :type files: List[discord.File] - :param allowed_mentions: AllowedMentions of the message. - :type allowed_mentions: discord.AllowedMentions - :param hidden: Whether the message is hidden, which means message content will only be seen to the author. - :type hidden: bool - :param delete_after: If provided, the number of seconds to wait in the background before deleting the message we just sent. If the deletion fails, then it is silently ignored. - :type delete_after: float - :param components: Message components in the response. The top level must be made of ActionRows. - :type components: List[dict] - :return: Union[discord.Message, dict] - """ - if embed and embeds: - raise error.IncorrectFormat("You can't use both `embed` and `embeds`!") - if embed: - embeds = [embed] - if embeds: - if not isinstance(embeds, list): - raise error.IncorrectFormat("Provide a list of embeds.") - elif len(embeds) > 10: - raise error.IncorrectFormat("Do not provide more than 10 embeds.") - if file and files: - raise error.IncorrectFormat("You can't use both `file` and `files`!") - if file: - files = [file] - if delete_after and hidden: - raise error.IncorrectFormat("You can't delete a hidden message!") - if components and not all(comp.get("type") == 1 for comp in components): - raise error.IncorrectFormat( - "The top level of the components list must be made of ActionRows!" - ) - - if allowed_mentions is not None: - if self.bot.allowed_mentions is not None: - allowed_mentions = self.bot.allowed_mentions.merge(allowed_mentions).to_dict() - else: - allowed_mentions = allowed_mentions.to_dict() - else: - if self.bot.allowed_mentions is not None: - allowed_mentions = self.bot.allowed_mentions.to_dict() - else: - allowed_mentions = {} - - base = { - "content": content, - "tts": tts, - "embeds": [x.to_dict() for x in embeds] if embeds else [], - "allowed_mentions": allowed_mentions, - "components": components or [], - } - if hidden: - base["flags"] = 64 - - initial_message = False - if not self.responded: - initial_message = True - if files and not self.deferred: - await self.defer(hidden=hidden) - if self.deferred: - if self._deferred_hidden != hidden: - self._logger.warning( - "Deferred response might not be what you set it to! (hidden / visible) " - "This is because it was deferred in a different state." - ) - resp = await self._http.edit(base, self._token, files=files) - self.deferred = False - else: - json_data = {"type": 4, "data": base} - await self._http.post_initial_response(json_data, self.interaction_id, self._token) - if not hidden: - resp = await self._http.edit({}, self._token) - else: - resp = {} - self.responded = True - else: - resp = await self._http.post_followup(base, self._token, files=files) - if files: - for file in files: - file.close() - if not hidden: - smsg = model.SlashMessage( - state=self.bot._connection, - data=resp, - channel=self.channel or discord.Object(id=self.channel_id), - _http=self._http, - interaction_token=self._token, - ) - if delete_after: - self.bot.loop.create_task(smsg.delete(delay=delete_after)) - if initial_message: - self.message = smsg - return smsg - else: - return resp - - async def reply( - self, - content: str = "", - *, - embed: discord.Embed = None, - embeds: typing.List[discord.Embed] = None, - tts: bool = False, - file: discord.File = None, - files: typing.List[discord.File] = None, - allowed_mentions: discord.AllowedMentions = None, - hidden: bool = False, - delete_after: float = None, - components: typing.List[dict] = None, - ) -> model.SlashMessage: - """ - Sends response of the interaction. This is currently an alias of the ``.send()`` method. - - .. warning:: - - Since Release 1.0.9, this is completely changed. If you are migrating from older version, please make sure to fix the usage. - - You can't use both ``embed`` and ``embeds`` at the same time, also applies to ``file`` and ``files``. - - If you send files in the initial response, this will defer if it's not been deferred, and then PATCH with the message - - :param content: Content of the response. - :type content: str - :param embed: Embed of the response. - :type embed: discord.Embed - :param embeds: Embeds of the response. Maximum 10. - :type embeds: List[discord.Embed] - :param tts: Whether to speak message using tts. Default ``False``. - :type tts: bool - :param file: File to send. - :type file: discord.File - :param files: Files to send. - :type files: List[discord.File] - :param allowed_mentions: AllowedMentions of the message. - :type allowed_mentions: discord.AllowedMentions - :param hidden: Whether the message is hidden, which means message content will only be seen to the author. - :type hidden: bool - :param delete_after: If provided, the number of seconds to wait in the background before deleting the message we just sent. If the deletion fails, then it is silently ignored. - :type delete_after: float - :param components: Message components in the response. The top level must be made of ActionRows. - :type components: List[dict] - :return: Union[discord.Message, dict] - """ - - return await self.send( - content=content, - embed=embed, - embeds=embeds, - tts=tts, - file=file, - files=files, - allowed_mentions=allowed_mentions, - hidden=hidden, - delete_after=delete_after, - components=components, - ) - - -class SlashContext(InteractionContext): - """ - Context of a slash command. Has all attributes from :class:`InteractionContext`, plus the slash-command-specific ones below. - - :ivar name: Name of the command. - :ivar args: List of processed arguments invoked with the command. - :ivar kwargs: Dictionary of processed arguments invoked with the command. - :ivar subcommand_name: Subcommand of the command. - :ivar subcommand_group: Subcommand group of the command. - :ivar command_id: ID of the command. - """ - - def __init__( - self, - _http: http.SlashCommandRequest, - _json: dict, - _discord: typing.Union[discord.Client, commands.Bot], - logger, - ): - self.name = self.command = self.invoked_with = _json["data"]["name"] - self.args = [] - self.kwargs = {} - self.subcommand_name = self.invoked_subcommand = self.subcommand_passed = None - self.subcommand_group = self.invoked_subcommand_group = self.subcommand_group_passed = None - self.command_id = _json["data"]["id"] - - super().__init__(_http=_http, _json=_json, _discord=_discord, logger=logger) - - @property - def slash(self) -> "client.SlashCommand": - """ - Returns the associated SlashCommand object created during Runtime. - - :return: client.SlashCommand - """ - return self.bot.slash # noqa - - @property - def cog(self) -> typing.Optional[commands.Cog]: - """ - Returns the cog associated with the command invoked, if any. - - :return: Optional[commands.Cog] - """ - - cmd_obj = self.slash.commands[self.command] - - if isinstance(cmd_obj, (model.CogBaseCommandObject, model.CogSubcommandObject)): - return cmd_obj.cog - else: - return None - - async def invoke(self, *args, **kwargs): - """ - Invokes a command with the arguments given.\n - Similar to d.py's `ctx.invoke` function and documentation.\n - - .. note:: - - This does not handle converters, checks, cooldowns, pre-invoke, - or after-invoke hooks in any matter. It calls the internal callback - directly as-if it was a regular function. - - You must take care in passing the proper arguments when - using this function. - - .. warning:: - The first parameter passed **must** be the command being invoked. - While using `ctx.defer`, if the command invoked includes usage of that command, do not invoke - `ctx.defer` before calling this function. It can not defer twice. - - :param args: Args for the command. - :param kwargs: Keyword args for the command. - - :raises: :exc:`TypeError` - """ - - try: - command = args[0] - except IndexError: - raise TypeError("Missing command to invoke.") from None - - ret = await self.slash.invoke_command(func=command, ctx=self, args=kwargs) - return ret - - -class ComponentContext(InteractionContext): - """ - Context of a component interaction. Has all attributes from :class:`InteractionContext`, plus the component-specific ones below. - - :ivar custom_id: The custom ID of the component (has alias component_id). - :ivar component_type: The type of the component. - :ivar component: Component data retrieved from the message. Not available if the origin message was ephemeral. - :ivar origin_message: The origin message of the component. Not available if the origin message was ephemeral. - :ivar origin_message_id: The ID of the origin message. - :ivar selected_options: The options selected (only for selects) - """ - - def __init__( - self, - _http: http.SlashCommandRequest, - _json: dict, - _discord: typing.Union[discord.Client, commands.Bot], - logger, - ): - self.custom_id = self.component_id = _json["data"]["custom_id"] - self.component_type = _json["data"]["component_type"] - super().__init__(_http=_http, _json=_json, _discord=_discord, logger=logger) - self.origin_message = None - self.origin_message_id = int(_json["message"]["id"]) if "message" in _json.keys() else None - - self.component = None - - self._deferred_edit_origin = False - - if self.origin_message_id and (_json["message"]["flags"] & 64) != 64: - self.origin_message = ComponentMessage( - state=self.bot._connection, channel=self.channel, data=_json["message"] - ) - self.component = self.origin_message.get_component(self.custom_id) - - self.selected_options = None - - if self.component_type == 3: - self.selected_options = _json["data"].get("values", []) - - async def defer(self, hidden: bool = False, edit_origin: bool = False, ignore: bool = False): - """ - 'Defers' the response, showing a loading state to the user - - :param hidden: Whether the deferred response should be ephemeral. Default ``False``. - :param edit_origin: Whether the type is editing the origin message. If ``False``, the deferred response will be for a follow up message. Defaults ``False``. - :param ignore: Whether to just ignore and not edit or send response. Using this can avoid showing interaction loading state. Default ``False``. - """ - if self.deferred or self.responded: - raise error.AlreadyResponded("You have already responded to this command!") - - base = {"type": 6 if edit_origin or ignore else 5} - - if edit_origin and ignore: - raise error.IncorrectFormat("'edit_origin' and 'ignore' are mutually exclusive") - - if hidden: - if edit_origin: - raise error.IncorrectFormat( - "'hidden' and 'edit_origin' flags are mutually exclusive" - ) - elif ignore: - self._deferred_hidden = True - else: - base["data"] = {"flags": 64} - self._deferred_hidden = True - - self._deferred_edit_origin = edit_origin - - await self._http.post_initial_response(base, self.interaction_id, self._token) - self.deferred = not ignore - - if ignore: - self.responded = True - - async def send( - self, - content: str = "", - *, - embed: discord.Embed = None, - embeds: typing.List[discord.Embed] = None, - tts: bool = False, - file: discord.File = None, - files: typing.List[discord.File] = None, - allowed_mentions: discord.AllowedMentions = None, - hidden: bool = False, - delete_after: float = None, - components: typing.List[dict] = None, - ) -> model.SlashMessage: - if self.deferred and self._deferred_edit_origin: - self._logger.warning( - "Deferred response might not be what you set it to! (edit origin / send response message) " - "This is because it was deferred with different response type." - ) - return await super().send( - content, - embed=embed, - embeds=embeds, - tts=tts, - file=file, - files=files, - allowed_mentions=allowed_mentions, - hidden=hidden, - delete_after=delete_after, - components=components, - ) - - async def edit_origin(self, **fields): - """ - Edits the origin message of the component. - Refer to :meth:`discord.Message.edit` and :meth:`InteractionContext.send` for fields. - """ - _resp = {} - - try: - content = fields["content"] - except KeyError: - pass - else: - if content is not None: - content = str(content) - _resp["content"] = content - - try: - components = fields["components"] - except KeyError: - pass - else: - if components is None: - _resp["components"] = [] - else: - _resp["components"] = components - - try: - embeds = fields["embeds"] - except KeyError: - # Nope - pass - else: - if not isinstance(embeds, list): - raise error.IncorrectFormat("Provide a list of embeds.") - if len(embeds) > 10: - raise error.IncorrectFormat("Do not provide more than 10 embeds.") - _resp["embeds"] = [e.to_dict() for e in embeds] - - try: - embed = fields["embed"] - except KeyError: - pass - else: - if "embeds" in _resp: - raise error.IncorrectFormat("You can't use both `embed` and `embeds`!") - - if embed is None: - _resp["embeds"] = [] - else: - _resp["embeds"] = [embed.to_dict()] - - file = fields.get("file") - files = fields.get("files") - - if files is not None and file is not None: - raise error.IncorrectFormat("You can't use both `file` and `files`!") - if file: - files = [file] - - allowed_mentions = fields.get("allowed_mentions") - if allowed_mentions is not None: - if self.bot.allowed_mentions is not None: - _resp["allowed_mentions"] = self.bot.allowed_mentions.merge( - allowed_mentions - ).to_dict() - else: - _resp["allowed_mentions"] = allowed_mentions.to_dict() - else: - if self.bot.allowed_mentions is not None: - _resp["allowed_mentions"] = self.bot.allowed_mentions.to_dict() - else: - _resp["allowed_mentions"] = {} - - if not self.responded: - if files and not self.deferred: - await self.defer(edit_origin=True) - if self.deferred: - if not self._deferred_edit_origin: - self._logger.warning( - "Deferred response might not be what you set it to! (edit origin / send response message) " - "This is because it was deferred with different response type." - ) - _json = await self._http.edit(_resp, self._token, files=files) - self.deferred = False - else: # noqa: F841 - json_data = {"type": 7, "data": _resp} - _json = await self._http.post_initial_response( # noqa: F841 - json_data, self.interaction_id, self._token - ) - self.responded = True - else: - raise error.IncorrectFormat("Already responded") - - if files: - for file in files: - file.close() - - # Commented out for now as sometimes (or at least, when not deferred) _json is an empty string? - # self.origin_message = ComponentMessage(state=self.bot._connection, channel=self.channel, - # data=_json) - - -class MenuContext(InteractionContext): - """ - Context of a context menu interaction. Has all attributes from :class:`InteractionContext`, plus the context-specific ones below. - - :ivar context_type: The type of context menu command. - :ivar _resolved: The data set for the context menu. - :ivar target_message: The targeted message of the context menu command if present. Defaults to ``None``. - :ivar target_id: The target ID of the context menu command. - :ivar target_author: The author targeted from the context menu command. - """ - - def __init__( - self, - _http: http.SlashCommandRequest, - _json: dict, - _discord: typing.Union[discord.Client, commands.Bot], - logger, - ): - super().__init__(_http=_http, _json=_json, _discord=_discord, logger=logger) - self.name = self.command = self.invoked_with = _json["data"]["name"] # This exists. - self.context_type = _json["type"] - self._resolved = self.data["resolved"] if "resolved" in self.data.keys() else None - self.target_message = None - self.target_author = None - self.target_id = self.data["target_id"] if "target_id" in self.data.keys() else None - - if self._resolved is not None: - try: - if self._resolved["messages"]: - _msg = [msg for msg in self._resolved["messages"]][0] - self.target_message = model.SlashMessage( - state=self.bot._connection, - channel=_discord.get_channel(self.channel_id), - data=self._resolved["messages"][_msg], - _http=_http, - interaction_token=self._token, - ) - except KeyError: # noqa - pass - - try: - if self.guild and self._resolved["members"]: - _auth = [auth for auth in self._resolved["members"]][0] - # member and user return the same ID - _neudict = self._resolved["members"][_auth] - _neudict["user"] = self._resolved["users"][_auth] - self.target_author = discord.Member( - data=_neudict, - state=self.bot._connection, - guild=self.guild, - ) - else: - _auth = [auth for auth in self._resolved["users"]][0] - self.target_author = discord.User( - data=self._resolved["users"][_auth], state=self.bot._connection - ) - except KeyError: # noqa - pass - - @property - def cog(self) -> typing.Optional[commands.Cog]: - """ - Returns the cog associated with the command invoked, if any. - - :return: Optional[commands.Cog] - """ - - cmd_obj = self.slash.commands[self.command] - - if isinstance(cmd_obj, (model.CogBaseCommandObject, model.CogSubcommandObject)): - return cmd_obj.cog - else: - return None - - async def defer(self, hidden: bool = False, edit_origin: bool = False, ignore: bool = False): - """ - 'Defers' the response, showing a loading state to the user - - :param hidden: Whether the deferred response should be ephemeral. Default ``False``. - :param edit_origin: Whether the type is editing the origin message. If ``False``, the deferred response will be for a follow up message. Defaults ``False``. - :param ignore: Whether to just ignore and not edit or send response. Using this can avoid showing interaction loading state. Default ``False``. - """ - if self.deferred or self.responded: - raise error.AlreadyResponded("You have already responded to this command!") - - base = {"type": 6 if edit_origin or ignore else 5} - - if edit_origin and ignore: - raise error.IncorrectFormat("'edit_origin' and 'ignore' are mutually exclusive") - - if hidden: - if edit_origin: - raise error.IncorrectFormat( - "'hidden' and 'edit_origin' flags are mutually exclusive" - ) - elif ignore: - self._deferred_hidden = True - else: - base["data"] = {"flags": 64} - self._deferred_hidden = True - - self._deferred_edit_origin = edit_origin - - await self._http.post_initial_response(base, self.interaction_id, self._token) - self.deferred = not ignore - - if ignore: - self.responded = True - - async def send( - self, - content: str = "", - *, - embed: discord.Embed = None, - embeds: typing.List[discord.Embed] = None, - tts: bool = False, - file: discord.File = None, - files: typing.List[discord.File] = None, - allowed_mentions: discord.AllowedMentions = None, - hidden: bool = False, - delete_after: float = None, - components: typing.List[dict] = None, - ) -> model.SlashMessage: - if self.deferred and self._deferred_edit_origin: - self._logger.warning( - "Deferred response might not be what you set it to! (edit origin / send response message) " - "This is because it was deferred with different response type." - ) - return await super().send( - content, - embed=embed, - embeds=embeds, - tts=tts, - file=file, - files=files, - allowed_mentions=allowed_mentions, - hidden=hidden, - delete_after=delete_after, - components=components, - ) diff --git a/discord_slash/dpy_overrides.py b/discord_slash/dpy_overrides.py deleted file mode 100644 index 8458a919..00000000 --- a/discord_slash/dpy_overrides.py +++ /dev/null @@ -1,326 +0,0 @@ -import typing - -import discord -from discord import AllowedMentions, File, InvalidArgument, abc, http, utils -from discord.ext import commands -from discord.http import Route - - -class ComponentMessage(discord.Message): - __slots__ = tuple(list(discord.Message.__slots__) + ["components"]) - - def __init__(self, *, state, channel, data): - super().__init__(state=state, channel=channel, data=data) - self.components = data["components"] - - def get_component(self, custom_id: int) -> typing.Optional[dict]: - """ - Returns first component with matching custom_id - - :param custom_id: custom_id of component to get from message components - :return: Optional[dict] - - """ - for row in self.components: - for component in row["components"]: - if "custom_id" in component and component["custom_id"] == custom_id: - return component - - -def new_override(cls, *args, **kwargs): - if isinstance(cls, discord.Message): - return object.__new__(ComponentMessage) - else: - return object.__new__(cls) - - -discord.message.Message.__new__ = new_override - - -def send_files( - self, - channel_id, - *, - files, - content=None, - tts=False, - embed=None, - components=None, - nonce=None, - allowed_mentions=None, - message_reference=None -): - r = Route("POST", "/channels/{channel_id}/messages", channel_id=channel_id) - form = [] - - payload = {"tts": tts} - if content: - payload["content"] = content - if embed: - payload["embed"] = embed - if components: - payload["components"] = components - if nonce: - payload["nonce"] = nonce - if allowed_mentions: - payload["allowed_mentions"] = allowed_mentions - if message_reference: - payload["message_reference"] = message_reference - - form.append({"name": "payload_json", "value": utils.to_json(payload)}) - if len(files) == 1: - file = files[0] - form.append( - { - "name": "file", - "value": file.fp, - "filename": file.filename, - "content_type": "application/octet-stream", - } - ) - else: - for index, file in enumerate(files): - form.append( - { - "name": "file%s" % index, - "value": file.fp, - "filename": file.filename, - "content_type": "application/octet-stream", - } - ) - - return self.request(r, form=form, files=files) - - -def send_message( - self, - channel_id, - content, - *, - tts=False, - embed=None, - components=None, - nonce=None, - allowed_mentions=None, - message_reference=None -): - r = Route("POST", "/channels/{channel_id}/messages", channel_id=channel_id) - payload = {} - - if content: - payload["content"] = content - - if tts: - payload["tts"] = True - - if embed: - payload["embed"] = embed - - if components: - payload["components"] = components - - if nonce: - payload["nonce"] = nonce - - if allowed_mentions: - payload["allowed_mentions"] = allowed_mentions - - if message_reference: - payload["message_reference"] = message_reference - - return self.request(r, json=payload) - - -http.HTTPClient.send_files = send_files -http.HTTPClient.send_message = send_message - - -async def send( - self, - content=None, - *, - tts=False, - embed=None, - file=None, - components=None, - files=None, - delete_after=None, - nonce=None, - allowed_mentions=None, - reference=None, - mention_author=None -): - """|coro| - - Sends a message to the destination with the content given. - - The content must be a type that can convert to a string through ``str(content)``. - If the content is set to ``None`` (the default), then the ``embed`` parameter must - be provided. - - To upload a single file, the ``file`` parameter should be used with a - single :class:`~discord.File` object. To upload multiple files, the ``files`` - parameter should be used with a :class:`list` of :class:`~discord.File` objects. - **Specifying both parameters will lead to an exception**. - - If the ``embed`` parameter is provided, it must be of type :class:`~discord.Embed` and - it must be a rich embed type. - - Parameters - ------------ - content: :class:`str` - The content of the message to send. - tts: :class:`bool` - Indicates if the message should be sent using text-to-speech. - embed: :class:`~discord.Embed` - The rich embed for the content. - file: :class:`~discord.File` - The file to upload. - files: List[:class:`~discord.File`] - A list of files to upload. Must be a maximum of 10. - nonce: :class:`int` - The nonce to use for sending this message. If the message was successfully sent, - then the message will have a nonce with this value. - delete_after: :class:`float` - If provided, the number of seconds to wait in the background - before deleting the message we just sent. If the deletion fails, - then it is silently ignored. - allowed_mentions: :class:`~discord.AllowedMentions` - Controls the mentions being processed in this message. If this is - passed, then the object is merged with :attr:`~discord.Client.allowed_mentions`. - The merging behaviour only overrides attributes that have been explicitly passed - to the object, otherwise it uses the attributes set in :attr:`~discord.Client.allowed_mentions`. - If no object is passed at all then the defaults given by :attr:`~discord.Client.allowed_mentions` - are used instead. - - .. versionadded:: 1.4 - - reference: Union[:class:`~discord.Message`, :class:`~discord.MessageReference`] - A reference to the :class:`~discord.Message` to which you are replying, this can be created using - :meth:`~discord.Message.to_reference` or passed directly as a :class:`~discord.Message`. You can control - whether this mentions the author of the referenced message using the :attr:`~discord.AllowedMentions.replied_user` - attribute of ``allowed_mentions`` or by setting ``mention_author``. - - .. versionadded:: 1.6 - - mention_author: Optional[:class:`bool`] - If set, overrides the :attr:`~discord.AllowedMentions.replied_user` attribute of ``allowed_mentions``. - - .. versionadded:: 1.6 - - Raises - -------- - ~discord.HTTPException - Sending the message failed. - ~discord.Forbidden - You do not have the proper permissions to send the message. - ~discord.InvalidArgument - The ``files`` list is not of the appropriate size, - you specified both ``file`` and ``files``, - or the ``reference`` object is not a :class:`~discord.Message` - or :class:`~discord.MessageReference`. - - Returns - --------- - :class:`~discord.Message` - The message that was sent. - """ - - channel = await self._get_channel() - state = self._state - content = str(content) if content is not None else None - components = components or [] - if embed is not None: - embed = embed.to_dict() - - if allowed_mentions is not None: - if state.allowed_mentions is not None: - allowed_mentions = state.allowed_mentions.merge(allowed_mentions).to_dict() - else: - allowed_mentions = allowed_mentions.to_dict() - else: - allowed_mentions = state.allowed_mentions and state.allowed_mentions.to_dict() - - if mention_author is not None: - allowed_mentions = allowed_mentions or AllowedMentions().to_dict() - allowed_mentions["replied_user"] = bool(mention_author) - - if reference is not None: - try: - reference = reference.to_message_reference_dict() - except AttributeError: - raise InvalidArgument( - "reference parameter must be Message or MessageReference" - ) from None - - if file is not None and files is not None: - raise InvalidArgument("cannot pass both file and files parameter to send()") - - if file is not None: - if not isinstance(file, File): - raise InvalidArgument("file parameter must be File") - - try: - data = await state.http.send_files( - channel.id, - files=[file], - allowed_mentions=allowed_mentions, - content=content, - tts=tts, - embed=embed, - nonce=nonce, - components=components, - message_reference=reference, - ) - finally: - file.close() - - elif files is not None: - if len(files) > 10: - raise InvalidArgument("files parameter must be a list of up to 10 elements") - elif not all(isinstance(file, File) for file in files): - raise InvalidArgument("files parameter must be a list of File") - - try: - data = await state.http.send_files( - channel.id, - files=files, - content=content, - tts=tts, - embed=embed, - nonce=nonce, - allowed_mentions=allowed_mentions, - components=components, - message_reference=reference, - ) - finally: - for f in files: - f.close() - else: - data = await state.http.send_message( - channel.id, - content, - tts=tts, - embed=embed, - components=components, - nonce=nonce, - allowed_mentions=allowed_mentions, - message_reference=reference, - ) - - ret = state.create_message(channel=channel, data=data) - if delete_after is not None: - await ret.delete(delay=delete_after) - return ret - - -async def send_override(context_or_channel, *args, **kwargs): - if isinstance(context_or_channel, commands.Context): - channel = context_or_channel.channel - else: - channel = context_or_channel - - return await send(channel, *args, **kwargs) - - -abc.Messageable.send = send_override diff --git a/discord_slash/error.py b/discord_slash/error.py deleted file mode 100644 index 21d3d438..00000000 --- a/discord_slash/error.py +++ /dev/null @@ -1,96 +0,0 @@ -class SlashCommandError(Exception): - """ - All exceptions of this extension can be captured with this. - - .. note:: - discord.py doesn't trigger `on_command_error` event. Use this extension's `on_slash_command_error`. - """ - - -class RequestFailure(SlashCommandError): - """ - Request to Discord API has failed. - - .. note:: - Since release ``1.0.8``, this is only used at :mod:`.utils.manage_commands`. :class:`.http.SlashCommandRequest` uses - exception from discord.py such as :class:`discord.HTTPException`. - - :ivar status: Status code of failed response. - :ivar msg: Message of failed response. - """ - - def __init__(self, status: int, msg: str): - self.status = status - self.msg = msg - super().__init__(f"Request failed with resp: {self.status} | {self.msg}") - - -class IncorrectFormat(SlashCommandError): - """ - Some formats are incorrect. See Discord API DOCS for proper format. - """ - - -class DuplicateCommand(SlashCommandError): - """ - There is a duplicate command name. - """ - - def __init__(self, name: str): - super().__init__(f"Duplicate command name detected: {name}") - - -class DuplicateCallback(SlashCommandError): - """ - There is a duplicate component callback. - """ - - def __init__(self, message_id: int, custom_id: str, component_type: int): - super().__init__( - f"Duplicate component callback detected: " - f"message ID {message_id or ''}, " - f"custom_id `{custom_id or ''}`, " - f"component_type `{component_type or ''}`" - ) - - -class DuplicateSlashClient(SlashCommandError): - """ - There are duplicate :class:`.SlashCommand` instances. - """ - - -class CheckFailure(SlashCommandError): - """ - Command check has failed. - """ - - -class IncorrectType(SlashCommandError): - """ - Type passed was incorrect - """ - - -class IncorrectGuildIDType(SlashCommandError): - """ - Guild ID type passed was incorrect - """ - - -class IncorrectCommandData(SlashCommandError): - """ - Incorrect data was passed to a slash command data object - """ - - -class AlreadyResponded(SlashCommandError): - """ - The interaction was already responded to - """ - - -class ContextMenuError(SlashCommandError): - """ - Special error given for context menu creation/callback issues. - """ diff --git a/discord_slash/http.py b/discord_slash/http.py deleted file mode 100644 index 2aaf564e..00000000 --- a/discord_slash/http.py +++ /dev/null @@ -1,208 +0,0 @@ -import json -import typing - -import aiohttp -import discord -from discord.http import Route - -from . import error -from .const import BASE_API - - -class CustomRoute(Route): - """discord.py's Route but changed ``BASE`` to use at slash command.""" - - BASE = BASE_API - - -class SlashCommandRequest: - def __init__(self, logger, _discord, application_id): - self.logger = logger - self._discord: typing.Union[discord.Client, discord.AutoShardedClient] = _discord - self._application_id = application_id - - @property - def application_id(self): - return self._application_id or self._discord.user.id - - def put_slash_commands(self, slash_commands: list, guild_id): - """ - Sends a slash command put request to the Discord API - - ``slash_commands`` must contain all the commands - - :param slash_commands: List of all the slash commands to make a put request to discord with. - :param guild_id: ID of the guild to set the commands on. Pass `None` for the global scope. - """ - return self.command_request(method="PUT", guild_id=guild_id, json=slash_commands) - - def remove_slash_command(self, guild_id, cmd_id): - """ - Sends a slash command delete request to Discord API. - - :param guild_id: ID of the guild to remove command. Pass `None` to remove global command. - :param cmd_id: ID of the command. - :return: Response code of the request. - """ - return self.command_request(method="DELETE", guild_id=guild_id, url_ending=f"/{cmd_id}") - - def get_all_commands(self, guild_id=None): - """ - Sends a slash command get request to Discord API for all commands. - - :param guild_id: ID of the guild to get commands. Pass `None` to get global commands. - :return: JSON Response of the request. - """ - return self.command_request(method="GET", guild_id=guild_id) - - def get_all_guild_commands_permissions(self, guild_id): - """ - Sends a slash command get request to Discord API for all permissions of a guild. - - :param guild_id: ID of the target guild to get registered command permissions of. - :return: JSON Response of the request. - """ - return self.command_request(method="GET", guild_id=guild_id, url_ending="/permissions") - - def update_guild_commands_permissions(self, guild_id, perms_dict): - """ - Sends a slash command put request to the Discord API for setting all command permissions of a guild. - - :param guild_id: ID of the target guild to register command permissions. - :return: JSON Response of the request. - """ - return self.command_request( - method="PUT", guild_id=guild_id, json=perms_dict, url_ending="/permissions" - ) - - def add_slash_command( - self, guild_id, cmd_name: str, description: str, options: list = None, context: dict = None - ): - """ - Sends a slash command add request to Discord API. - - :param guild_id: ID of the guild to add command. Pass `None` to add global command. - :param cmd_name: Name of the command. Must be match the regular expression ``^[a-z0-9_-]{1,32}$``. - :param description: Description of the command. - :param options: List of the function. - :param context: Dictionary of context. Sends as separate. - :return: JSON Response of the request. - """ - base = {"name": cmd_name, "description": description, "options": options or []} - if context: - new_base = {"type": context["type"], "name": context["name"]} - return self.command_request(json=new_base, method="POST", guild_id=guild_id) - return self.command_request(json=base, method="POST", guild_id=guild_id) - - def command_request(self, method, guild_id, url_ending="", **kwargs): - r""" - Sends a command request to discord (post, get, delete, etc) - - :param method: HTTP method. - :param guild_id: ID of the guild to make the request on. `None` to make a request on the global scope. - :param url_ending: String to append onto the end of the url. - :param \**kwargs: Kwargs to pass into discord.py's `request function `_ - """ - url = f"/applications/{self.application_id}" - url += "/commands" if not guild_id else f"/guilds/{guild_id}/commands" - url += url_ending - route = CustomRoute(method, url) - return self._discord.http.request(route, **kwargs) - - def post_followup(self, _resp, token, files: typing.List[discord.File] = None): - """ - Sends command followup response POST request to Discord API. - - :param _resp: Command response. - :type _resp: dict - :param token: Command message token. - :param files: Files to send. Default ``None`` - :type files: List[discord.File] - :return: Coroutine - """ - if files: - return self.request_with_files(_resp, files, token, "POST") - return self.command_response(token, True, "POST", json=_resp) - - def post_initial_response(self, _resp, interaction_id, token): - """ - Sends an initial "POST" response to the Discord API. - - :param _resp: Command response. - :type _resp: dict - :param interaction_id: Interaction ID. - :param token: Command message token. - :return: Coroutine - """ - return self.command_response(token, False, "POST", interaction_id, json=_resp) - - def command_response( - self, token, use_webhook, method, interaction_id=None, url_ending="", **kwargs - ): - r""" - Sends a command response to discord (POST, PATCH, DELETE) - - :param token: Interaction token - :param use_webhook: Whether to use webhooks - :param method: The HTTP request to use - :param interaction_id: The id of the interaction - :param url_ending: String to append onto the end of the url. - :param \**kwargs: Kwargs to pass into discord.py's `request function `_ - :return: Coroutine - """ - if not use_webhook and not interaction_id: - raise error.IncorrectFormat( - "Internal Error! interaction_id must be set if use_webhook is False" - ) - req_url = ( - f"/webhooks/{self.application_id}/{token}" - if use_webhook - else f"/interactions/{interaction_id}/{token}/callback" - ) - req_url += url_ending - route = CustomRoute(method, req_url) - return self._discord.http.request(route, **kwargs) - - def request_with_files( - self, _resp, files: typing.List[discord.File], token, method, url_ending="" - ): - - form = aiohttp.FormData() - form.add_field("payload_json", json.dumps(_resp)) - for x in range(len(files)): - name = f"file{x if len(files) > 1 else ''}" - sel = files[x] - form.add_field( - name, sel.fp, filename=sel.filename, content_type="application/octet-stream" - ) - return self.command_response( - token, True, method, data=form, files=files, url_ending=url_ending - ) - - def edit(self, _resp, token, message_id="@original", files: typing.List[discord.File] = None): - """ - Sends edit command response PATCH request to Discord API. - - :param _resp: Edited response. - :type _resp: dict - :param token: Command message token. - :param message_id: Message ID to edit. Default initial message. - :param files: Files. Default ``None`` - :type files: List[discord.File] - :return: Coroutine - """ - req_url = f"/messages/{message_id}" - if files: - return self.request_with_files(_resp, files, token, "PATCH", url_ending=req_url) - return self.command_response(token, True, "PATCH", url_ending=req_url, json=_resp) - - def delete(self, token, message_id="@original"): - """ - Sends delete command response POST request to Discord API. - - :param token: Command message token. - :param message_id: Message ID to delete. Default initial message. - :return: Coroutine - """ - req_url = f"/messages/{message_id}" - return self.command_response(token, True, "DELETE", url_ending=req_url) diff --git a/discord_slash/model.py b/discord_slash/model.py deleted file mode 100644 index 79cbd446..00000000 --- a/discord_slash/model.py +++ /dev/null @@ -1,715 +0,0 @@ -import asyncio -import datetime -import typing -from contextlib import suppress -from enum import IntEnum -from inspect import iscoroutinefunction - -import discord -from discord.ext.commands import BucketType, CommandOnCooldown, CooldownMapping - -from . import error, http -from .dpy_overrides import ComponentMessage - - -class ChoiceData: - """ - Command choice data object - - :ivar name: Name of the choice, this is what the user will see - :ivar value: Values of the choice, this is what discord will return to you - """ - - def __init__(self, name, value): - self.name = name - self.value = value - - def __eq__(self, other): - return isinstance(other, ChoiceData) and self.__dict__ == other.__dict__ - - -class OptionData: - """ - Command option data object - - :ivar name: Name of the option. - :ivar description: Description of the option. - :ivar required: If the option is required. - :ivar choices: A list of :class:`ChoiceData`, cannot be present on subcommand groups - :ivar options: List of :class:`OptionData`, this will be present if it's a subcommand group - """ - - def __init__(self, name, description, required=False, choices=None, options=None, **kwargs): - self.name = name - self.description = description - self.type = kwargs.get("type") - if self.type is None: - raise error.IncorrectCommandData("type is required for options") - self.required = required - if choices is not None: - self.choices = [] - for choice in choices: - self.choices.append(ChoiceData(**choice)) - else: - self.choices = None - - if self.type in (1, 2): - self.options = [] - if options is not None: - for option in options: - self.options.append(OptionData(**option)) - elif self.type == 2: - raise error.IncorrectCommandData( - "Options are required for subcommands / subcommand groups" - ) - - def __eq__(self, other): - return isinstance(other, OptionData) and self.__dict__ == other.__dict__ - - -class CommandData: - """ - Slash command data object - - :ivar name: Name of the command. - :ivar description: Description of the command. - :ivar default_permission: Indicates whether users should have permissions to run this command by default. - :ivar options: List of :class:`OptionData`. - :ivar id: Command id, this is received from discord so may not be present - :ivar application_id: The application id of the bot, required only when the application id and bot id are different. (old bots) - """ - - def __init__( - self, - name, - description=None, - options=None, - default_permission=True, - id=None, - application_id=None, - version=None, - **kwargs, - ): - self.name = name - self.description = description - self.default_permission = default_permission - self.id = id - self.application_id = application_id - self.version = version - if options is not None: - self.options = [] - for option in options: - self.options.append(OptionData(**option)) - else: - self.options = None - - def __eq__(self, other): - if isinstance(other, CommandData): - return ( - self.name == other.name - and self.description == other.description - and self.options == other.options - and self.default_permission == other.default_permission - ) - else: - return False - - -class CallbackObject: - """ - Callback object of this extension. - - .. warning:: - Do not manually init this model. - - :ivar func: The coroutine of the command. - :ivar __commands_checks__: Check of the command. - """ - - def __init__(self, func): - self.func = func - - # Ref https://github.com/Rapptz/discord.py/blob/master/discord/ext/commands/core.py#L1447 - # Since this isn't inherited from `discord.ext.commands.Command`, discord.py's check decorator will - # add checks at this var. - self.__commands_checks__ = [] - if hasattr(self.func, "__commands_checks__"): - self.__commands_checks__ = self.func.__commands_checks__ - - cooldown = None - if hasattr(self.func, "__commands_cooldown__"): - cooldown = self.func.__commands_cooldown__ - try: - self._buckets = CooldownMapping(cooldown) - except TypeError: - self._buckets = CooldownMapping(cooldown, BucketType.default) - - self._max_concurrency = None - if hasattr(self.func, "__commands_max_concurrency__"): - self._max_concurrency = self.func.__commands_max_concurrency__ - - self.on_error = None - - def error(self, coro): - """ - A decorator that registers a coroutine as a local error handler. - - Works the same way as it does in d.py - - :param: :ref:`coroutine ` - The coroutine to register as the local error handler - - :raises: TypeError - The coroutine passed is not a coroutine - """ - if not asyncio.iscoroutinefunction(coro): - raise TypeError("The error handler must be a coroutine.") - self.on_error = coro - return coro - - def _prepare_cooldowns(self, ctx): - """ - Ref https://github.com/Rapptz/discord.py/blob/master/discord/ext/commands/core.py#L765 - """ - if self._buckets.valid: - dt = ctx.created_at - current = dt.replace(tzinfo=datetime.timezone.utc).timestamp() - bucket = self._buckets.get_bucket(ctx, current) - retry_after = bucket.update_rate_limit(current) - if retry_after: - raise CommandOnCooldown(bucket, retry_after) - - async def _concurrency_checks(self, ctx): - """The checks required for cooldown and max concurrency.""" - # max concurrency checks - if self._max_concurrency is not None: - await self._max_concurrency.acquire(ctx) - try: - # cooldown checks - self._prepare_cooldowns(ctx) - except Exception: - if self._max_concurrency is not None: - await self._max_concurrency.release(ctx) - raise - - async def invoke(self, *args, **kwargs): - """ - Invokes the command. - - :param args: Args for the command. - :raises: .error.CheckFailure - """ - can_run = await self.can_run(args[0]) - if not can_run: - raise error.CheckFailure - - await self._concurrency_checks(args[0]) - - # to preventing needing different functions per object, - # this function simply handles cogs - if hasattr(self, "cog"): - return await self.func(self.cog, *args, **kwargs) - return await self.func(*args, **kwargs) - - def is_on_cooldown(self, ctx) -> bool: - """ - Checks whether the command is currently on cooldown. - - Works the same way as it does in d.py - - :param ctx: SlashContext - :type ctx: .context.SlashContext - - :return: bool - indicating if the command is on cooldown. - """ - if not self._buckets.valid: - return False - - bucket = self._buckets.get_bucket(ctx.message) - dt = ctx.message.edited_at or ctx.message.created_at - current = dt.replace(tzinfo=datetime.timezone.utc).timestamp() - return bucket.get_tokens(current) == 0 - - def reset_cooldown(self, ctx): - """ - Resets the cooldown on this command. - - Works the same way as it does in d.py - - :param ctx: SlashContext - :type ctx: .context.SlashContext - """ - if self._buckets.valid: - bucket = self._buckets.get_bucket(ctx.message) - bucket.reset() - - def get_cooldown_retry_after(self, ctx) -> float: - """ - Retrieves the amount of seconds before this command can be tried again. - - Works the same way as it does in d.py - - :param ctx: SlashContext - :type ctx: .context.SlashContext - - :return: float - The amount of time left on this command's cooldown in seconds. If this is ``0.0`` then the command isn't on cooldown. - """ - if self._buckets.valid: - bucket = self._buckets.get_bucket(ctx.message) - dt = ctx.message.edited_at or ctx.message.created_at - current = dt.replace(tzinfo=datetime.timezone.utc).timestamp() - return bucket.get_retry_after(current) - - return 0.0 - - def add_check(self, func): - """ - Adds check to the command. - - :param func: Any callable. Coroutines are supported. - """ - self.__commands_checks__.append(func) - - def remove_check(self, func): - """ - Removes check to the command. - - .. note:: - If the function is not found at the command check, it will ignore. - - :param func: Any callable. Coroutines are supported. - """ - with suppress(ValueError): - self.__commands_checks__.remove(func) - - async def can_run(self, ctx) -> bool: - """ - Whether the command can be run. - - :param ctx: SlashContext for the check running. - :type ctx: .context.SlashContext - :return: bool - """ - res = [ - bool(x(ctx)) if not iscoroutinefunction(x) else bool(await x(ctx)) - for x in self.__commands_checks__ - ] - return False not in res - - -class CommandObject(CallbackObject): - """ - Slash command object of this extension. - - .. warning:: - Do not manually init this model. - - :ivar name: Name of the command. - :ivar description: Description of the command. - :ivar allowed_guild_ids: List of the allowed guild id. - :ivar options: List of the option of the command. Used for `auto_register`. - :ivar connector: Kwargs connector of the command. - """ - - def __init__(self, name, cmd, _type=1): # Let's reuse old command formatting. - super().__init__(cmd["func"]) - self.name = name.lower() if _type == 1 else name - self.description = cmd["description"] - self.allowed_guild_ids = cmd["guild_ids"] or [] - self.options = cmd["api_options"] or [] - self.connector = cmd["connector"] or {} - self._type = _type - - -class BaseCommandObject(CommandObject): - """ - BaseCommand object of this extension. - - .. note:: - This model inherits :class:`.model.CommandObject`, so this has every variables from that. - - .. warning:: - Do not manually init this model. - - :ivar has_subcommands: Indicates whether this base command has subcommands. - :ivar default_permission: Indicates whether users should have permissions to run this command by default. - :ivar permissions: Permissions to restrict use of this command. - """ - - def __init__(self, name, cmd, _type=1): # Let's reuse old command formatting. - super().__init__(name, cmd, _type) - self.has_subcommands = cmd["has_subcommands"] - self.default_permission = cmd["default_permission"] - self.permissions = cmd["api_permissions"] or {} - - -class SubcommandObject(CommandObject): - """ - Subcommand object of this extension. - - .. note:: - This model inherits :class:`.model.CommandObject`, so this has every variables from that. - - .. warning:: - Do not manually init this model. - - :ivar base: Name of the base slash command. - :ivar subcommand_group: Name of the subcommand group. ``None`` if not exist. - :ivar base_description: Description of the base command. - :ivar subcommand_group_description: Description of the subcommand_group. - """ - - def __init__(self, sub, base, name, sub_group=None): - super().__init__(name, sub) - self.base = base.lower() - self.subcommand_group = sub_group.lower() if sub_group else sub_group - self.base_description = sub["base_desc"] - self.subcommand_group_description = sub["sub_group_desc"] - - -class CogBaseCommandObject(BaseCommandObject): - """ - Slash command object but for Cog. - - .. warning:: - Do not manually init this model. - """ - - def __init__(self, *args): - super().__init__(*args) - self.cog = None # Manually set this later. - - -class CogSubcommandObject(SubcommandObject): - """ - Subcommand object but for Cog. - - .. warning:: - Do not manually init this model. - """ - - def __init__(self, base, cmd, sub_group, name, sub): - super().__init__(sub, base, name, sub_group) - self.base_command_data = cmd - self.cog = None # Manually set this later. - - -class ComponentCallbackObject(CallbackObject): - """ - Internal component object. Inherits :class:`CallbackObject`, so it has all variables from it. - - .. warning:: - Do not manually init this model. - - :ivar message_ids: The message IDs registered to this callback. - :ivar custom_ids: The component custom IDs registered to this callback. - :ivar component_type: Type of the component. See `:class.utils.manage_components.ComponentsType` - """ - - def __init__( - self, - func, - message_ids, - custom_ids, - component_type, - ): - if component_type not in (2, 3, None): - raise error.IncorrectFormat(f"Invalid component type `{component_type}`") - - super().__init__(func) - message_ids = set(message_ids) - custom_ids = set(custom_ids) - self.keys = { - (message_id, custom_id) for message_id in message_ids for custom_id in custom_ids - } - - self.component_type = component_type - - async def invoke(self, ctx): - """ - Invokes the component callback. - - :param ctx: The :class:`.context.ComponentContext` for the interaction. - """ - return await super().invoke(ctx) - - -class CogComponentCallbackObject(ComponentCallbackObject): - """ - Component callback object but for Cog. Has all variables from :class:`ComponentCallbackObject`. - - .. warning:: - Do not manually init this model. - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.cog = None # Manually set this later. - - -class SlashCommandOptionType(IntEnum): - """ - Equivalent of `ApplicationCommandOptionType `_ in the Discord API. - """ - - SUB_COMMAND = 1 - SUB_COMMAND_GROUP = 2 - STRING = 3 - INTEGER = 4 - BOOLEAN = 5 - USER = 6 - CHANNEL = 7 - ROLE = 8 - MENTIONABLE = 9 - FLOAT = 10 - - @classmethod - def from_type(cls, t: type): - """ - Get a specific SlashCommandOptionType from a type (or object). - - :param t: The type or object to get a SlashCommandOptionType for. - :return: :class:`.model.SlashCommandOptionType` or ``None`` - """ - - if issubclass(t, str): - return cls.STRING - if issubclass(t, bool): - return cls.BOOLEAN - # The check for bool MUST be above the check for integers as booleans subclass integers - if issubclass(t, int): - return cls.INTEGER - if issubclass(t, discord.abc.User): - return cls.USER - if issubclass(t, discord.abc.GuildChannel): - return cls.CHANNEL - if issubclass(t, discord.abc.Role): - return cls.ROLE - if hasattr(typing, "_GenericAlias"): # 3.7 onwards - # Easier than imports - if hasattr(t, "__origin__"): - if t.__origin__ is typing.Union: - # proven in 3.7.8+, 3.8.6+, 3.9+ definitively - return cls.MENTIONABLE - if not hasattr(typing, "_GenericAlias"): # py 3.6 - if isinstance(t, typing._Union): # noqa - return cls.MENTIONABLE - - if issubclass(t, float): - return cls.FLOAT - - -class SlashMessage(ComponentMessage): - """discord.py's :class:`discord.Message` but overridden ``edit`` and ``delete`` to work for slash command.""" - - def __init__(self, *, state, channel, data, _http: http.SlashCommandRequest, interaction_token): - # Yes I know it isn't the best way but this makes implementation simple. - super().__init__(state=state, channel=channel, data=data) - self._http = _http - self.__interaction_token = interaction_token - - async def _slash_edit(self, **fields): - """ - An internal function - """ - _resp = {} - - try: - content = fields["content"] - except KeyError: - pass - else: - if content is not None: - content = str(content) - _resp["content"] = content - - try: - components = fields["components"] - except KeyError: - pass - else: - if components is None: - _resp["components"] = [] - else: - _resp["components"] = components - - try: - embeds = fields["embeds"] - except KeyError: - # Nope - pass - else: - if not isinstance(embeds, list): - raise error.IncorrectFormat("Provide a list of embeds.") - if len(embeds) > 10: - raise error.IncorrectFormat("Do not provide more than 10 embeds.") - _resp["embeds"] = [e.to_dict() for e in embeds] - - try: - embed = fields["embed"] - except KeyError: - pass - else: - if "embeds" in _resp: - raise error.IncorrectFormat("You can't use both `embed` and `embeds`!") - - if embed is None: - _resp["embeds"] = [] - else: - _resp["embeds"] = [embed.to_dict()] - - file = fields.get("file") - files = fields.get("files") - - if files is not None and file is not None: - raise error.IncorrectFormat("You can't use both `file` and `files`!") - if file: - files = [file] - - allowed_mentions = fields.get("allowed_mentions") - if allowed_mentions is not None: - if self._state.allowed_mentions is not None: - _resp["allowed_mentions"] = self._state.allowed_mentions.merge( - allowed_mentions - ).to_dict() - else: - _resp["allowed_mentions"] = allowed_mentions.to_dict() - else: - if self._state.allowed_mentions is not None: - _resp["allowed_mentions"] = self._state.allowed_mentions.to_dict() - else: - _resp["allowed_mentions"] = {} - - await self._http.edit(_resp, self.__interaction_token, self.id, files=files) - - delete_after = fields.get("delete_after") - if delete_after: - await self.delete(delay=delete_after) - if files: - [x.close() for x in files] - - async def edit(self, **fields): - """Refer :meth:`discord.Message.edit`.""" - if "file" in fields or "files" in fields or "embeds" in fields: - await self._slash_edit(**fields) - else: - try: - await super().edit(**fields) - except discord.Forbidden: - await self._slash_edit(**fields) - - async def delete(self, *, delay=None): - """Refer :meth:`discord.Message.delete`.""" - try: - await super().delete(delay=delay) - except discord.Forbidden: - if not delay: - return await self._http.delete(self.__interaction_token, self.id) - - async def wrap(): - with suppress(discord.HTTPException): - await asyncio.sleep(delay) - await self._http.delete(self.__interaction_token, self.id) - - self._state.loop.create_task(wrap()) - - -class PermissionData: - """ - Single slash permission data. - - :ivar id: User or role id, based on following type specfic. - :ivar type: The ``SlashCommandPermissionsType`` type of this permission. - :ivar permission: State of permission. ``True`` to allow, ``False`` to disallow. - """ - - def __init__(self, id, type, permission, **kwargs): - self.id = id - self.type = type - self.permission = permission - - def __eq__(self, other): - if isinstance(other, PermissionData): - return ( - self.id == other.id - and self.type == other.id - and self.permission == other.permission - ) - else: - return False - - -class GuildPermissionsData: - """ - Slash permissions data for a command in a guild. - - :ivar id: Command id, provided by discord. - :ivar guild_id: Guild id that the permissions are in. - :ivar permissions: List of permissions dict. - """ - - def __init__(self, id, guild_id, permissions, **kwargs): - self.id = id - self.guild_id = guild_id - self.permissions = [] - if permissions: - for permission in permissions: - self.permissions.append(PermissionData(**permission)) - - def __eq__(self, other): - if isinstance(other, GuildPermissionsData): - return ( - self.id == other.id - and self.guild_id == other.guild_id - and self.permissions == other.permissions - ) - else: - return False - - -class SlashCommandPermissionType(IntEnum): - """ - Equivalent of `ApplicationCommandPermissionType `_ in the Discord API. - """ - - ROLE = 1 - USER = 2 - - @classmethod - def from_type(cls, t: type): - if issubclass(t, discord.abc.Role): - return cls.ROLE - if issubclass(t, discord.abc.User): - return cls.USER - - -class ComponentType(IntEnum): - actionrow = 1 - button = 2 - select = 3 - - -class ButtonStyle(IntEnum): - blue = 1 - blurple = 1 - gray = 2 - grey = 2 - green = 3 - red = 4 - URL = 5 - - primary = 1 - secondary = 2 - success = 3 - danger = 4 - - -class ContextMenuType(IntEnum): - CHAT_INPUT = 1 - USER = 2 - MESSAGE = 3 - - # @classmethod - # def from_type(cls, t: type): - # if isinstance(t, discord.Member) or issubclass(t, discord.abc.User): - # return cls.USER - # if issubclass(t, discord.abc.Messageable): - # return cls.MESSAGE diff --git a/discord_slash/utils/__init__.py b/discord_slash/utils/__init__.py deleted file mode 100644 index ca5ea755..00000000 --- a/discord_slash/utils/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -""" -discord-py-slash-command.utils -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Utility functions for slash command. - -:copyright: (c) 2020-2021 eunwoo1104 -:license: MIT -""" diff --git a/discord_slash/utils/manage_commands.py b/discord_slash/utils/manage_commands.py deleted file mode 100644 index 0e99b501..00000000 --- a/discord_slash/utils/manage_commands.py +++ /dev/null @@ -1,405 +0,0 @@ -import asyncio -import inspect -import typing -from collections.abc import Callable -from typing import Union - -import aiohttp - -from ..error import IncorrectType, RequestFailure -from ..model import SlashCommandOptionType, SlashCommandPermissionType - - -async def add_slash_command( - bot_id, bot_token: str, guild_id, cmd_name: str, description: str, options: list = None -): - """ - A coroutine that sends a slash command add request to Discord API. - - :param bot_id: User ID of the bot. - :param bot_token: Token of the bot. - :param guild_id: ID of the guild to add command. Pass `None` to add global command. - :param cmd_name: Name of the command. Must match the regular expression ``^[a-z0-9_-]{1,32}$``. - :param description: Description of the command. - :param options: List of the function. - :return: JSON Response of the request. - :raises: :class:`.error.RequestFailure` - Requesting to Discord API has failed. - """ - url = f"https://discord.com/api/v8/applications/{bot_id}" - url += "/commands" if not guild_id else f"/guilds/{guild_id}/commands" - base = {"name": cmd_name, "description": description, "options": options or []} - - async with aiohttp.ClientSession() as session: - async with session.post( - url, headers={"Authorization": f"Bot {bot_token}"}, json=base - ) as resp: - if resp.status == 429: - _json = await resp.json() - await asyncio.sleep(_json["retry_after"]) - return await add_slash_command( - bot_id, bot_token, guild_id, cmd_name, description, options - ) - if not 200 <= resp.status < 300: - raise RequestFailure(resp.status, await resp.text()) - return await resp.json() - - -async def remove_slash_command(bot_id, bot_token, guild_id, cmd_id): - """ - A coroutine that sends a slash command remove request to Discord API. - - :param bot_id: User ID of the bot. - :param bot_token: Token of the bot. - :param guild_id: ID of the guild to remove command. Pass `None` to remove global command. - :param cmd_id: ID of the command. - :return: Response code of the request. - :raises: :class:`.error.RequestFailure` - Requesting to Discord API has failed. - """ - url = f"https://discord.com/api/v8/applications/{bot_id}" - url += "/commands" if not guild_id else f"/guilds/{guild_id}/commands" - url += f"/{cmd_id}" - async with aiohttp.ClientSession() as session: - async with session.delete(url, headers={"Authorization": f"Bot {bot_token}"}) as resp: - if resp.status == 429: - _json = await resp.json() - await asyncio.sleep(_json["retry_after"]) - return await remove_slash_command(bot_id, bot_token, guild_id, cmd_id) - if not 200 <= resp.status < 300: - raise RequestFailure(resp.status, await resp.text()) - return resp.status - - -async def get_all_commands(bot_id, bot_token, guild_id=None): - """ - A coroutine that sends a slash command get request to Discord API. - - :param bot_id: User ID of the bot. - :param bot_token: Token of the bot. - :param guild_id: ID of the guild to get commands. Pass `None` to get all global commands. - :return: JSON Response of the request. - :raises: :class:`.error.RequestFailure` - Requesting to Discord API has failed. - """ - url = f"https://discord.com/api/v8/applications/{bot_id}" - url += "/commands" if not guild_id else f"/guilds/{guild_id}/commands" - async with aiohttp.ClientSession() as session: - async with session.get(url, headers={"Authorization": f"Bot {bot_token}"}) as resp: - if resp.status == 429: - _json = await resp.json() - await asyncio.sleep(_json["retry_after"]) - return await get_all_commands(bot_id, bot_token, guild_id) - if not 200 <= resp.status < 300: - raise RequestFailure(resp.status, await resp.text()) - return await resp.json() - - -async def remove_all_commands(bot_id, bot_token, guild_ids: typing.List[int] = None): - """ - Remove all slash commands. - - :param bot_id: User ID of the bot. - :param bot_token: Token of the bot. - :param guild_ids: List of the guild ID to remove commands. Pass ``None`` to remove only the global commands. - """ - - await remove_all_commands_in(bot_id, bot_token, None) - - for x in guild_ids or []: - try: - await remove_all_commands_in(bot_id, bot_token, x) - except RequestFailure: - pass - - -async def remove_all_commands_in(bot_id, bot_token, guild_id=None): - """ - Remove all slash commands in area. - - :param bot_id: User ID of the bot. - :param bot_token: Token of the bot. - :param guild_id: ID of the guild to remove commands. Pass `None` to remove all global commands. - """ - commands = await get_all_commands(bot_id, bot_token, guild_id) - - for x in commands: - await remove_slash_command(bot_id, bot_token, guild_id, x["id"]) - - -async def get_all_guild_commands_permissions(bot_id, bot_token, guild_id): - """ - A coroutine that sends a gets all the commands permissions for that guild. - - :param bot_id: User ID of the bot. - :param bot_token: Token of the bot. - :param guild_id: ID of the guild to get permissions. - :return: JSON Response of the request. A list of . - :raises: :class:`.error.RequestFailure` - Requesting to Discord API has failed. - """ - url = f"https://discord.com/api/v8/applications/{bot_id}/guilds/{guild_id}/commands/permissions" - async with aiohttp.ClientSession() as session: - async with session.get(url, headers={"Authorization": f"Bot {bot_token}"}) as resp: - if resp.status == 429: - _json = await resp.json() - await asyncio.sleep(_json["retry_after"]) - return await get_all_guild_commands_permissions(bot_id, bot_token, guild_id) - if not 200 <= resp.status < 300: - raise RequestFailure(resp.status, await resp.text()) - return await resp.json() - - -async def get_guild_command_permissions(bot_id, bot_token, guild_id, command_id): - """ - A coroutine that sends a request to get a single command's permissions in guild - - :param bot_id: User ID of the bot. - :param bot_token: Token of the bot. - :param guild_id: ID of the guild to update permissions on. - :param command_id: ID for the command to update permissions on. - :return: JSON Response of the request. A list of - :raises: :class:`.error.RequestFailure` - Requesting to Discord API has failed. - """ - url = f"https://discord.com/api/v8/applications/{bot_id}/guilds/{guild_id}/commands/{command_id}/permissions" - async with aiohttp.ClientSession() as session: - async with session.get(url, headers={"Authorization": f"Bot {bot_token}"}) as resp: - if resp.status == 429: - _json = await resp.json() - await asyncio.sleep(_json["retry_after"]) - return await get_guild_command_permissions(bot_id, bot_token, guild_id, command_id) - if not 200 <= resp.status < 300: - raise RequestFailure(resp.status, await resp.text()) - return await resp.json() - - -async def update_single_command_permissions(bot_id, bot_token, guild_id, command_id, permissions): - """ - A coroutine that sends a request to update a single command's permissions in guild - - :param bot_id: User ID of the bot. - :param bot_token: Token of the bot. - :param guild_id: ID of the guild to update permissions on. - :param command_id: ID for the command to update permissions on. - :param permissions: List of permissions for the command. - :return: JSON Response of the request. A list of - :raises: :class:`.error.RequestFailure` - Requesting to Discord API has failed. - """ - url = f"https://discord.com/api/v8/applications/{bot_id}/guilds/{guild_id}/commands/{command_id}/permissions" - async with aiohttp.ClientSession() as session: - async with session.put( - url, headers={"Authorization": f"Bot {bot_token}"}, json={"permissions": permissions} - ) as resp: - if resp.status == 429: - _json = await resp.json() - await asyncio.sleep(_json["retry_after"]) - return await update_single_command_permissions( - bot_id, bot_token, guild_id, command_id, permissions - ) - if not 200 <= resp.status < 300: - raise RequestFailure(resp.status, await resp.text()) - return await resp.json() - - -async def update_guild_commands_permissions(bot_id, bot_token, guild_id, cmd_permissions): - """ - A coroutine that updates permissions for all commands in a guild. - - :param bot_id: User ID of the bot. - :param bot_token: Token of the bot. - :param guild_id: ID of the guild to update permissions. - :param cmd_permissions: List of dict with permissions for each commands. - :return: JSON Response of the request. A list of . - :raises: :class:`.error.RequestFailure` - Requesting to Discord API has failed. - """ - url = f"https://discord.com/api/v8/applications/{bot_id}/guilds/{guild_id}/commands/permissions" - async with aiohttp.ClientSession() as session: - async with session.put( - url, headers={"Authorization": f"Bot {bot_token}"}, json=cmd_permissions - ) as resp: - if resp.status == 429: - _json = await resp.json() - await asyncio.sleep(_json["retry_after"]) - return await update_guild_commands_permissions( - bot_id, bot_token, guild_id, cmd_permissions - ) - if not 200 <= resp.status < 300: - raise RequestFailure(resp.status, await resp.text()) - return await resp.json() - - -def create_option( - name: str, - description: str, - option_type: typing.Union[int, type], - required: bool, - choices: list = None, -) -> dict: - """ - Creates option used for creating slash command. - - :param name: Name of the option. - :param description: Description of the option. - :param option_type: Type of the option. - :param required: Whether this option is required. - :param choices: Choices of the option. Can be empty. - :return: dict - - .. note:: - An option with ``required=False`` will not pass anything to the command function if the user doesn't pass that option when invoking the command. - You must set the the relevant argument's function to a default argument, eg ``argname = None``. - - .. note:: - ``choices`` must either be a list of `option type dicts `_ - or a list of single string values. - """ - if not isinstance(option_type, int) or isinstance( - option_type, bool - ): # Bool values are a subclass of int - original_type = option_type - option_type = SlashCommandOptionType.from_type(original_type) - if option_type is None: - raise IncorrectType( - f"The type {original_type} is not recognized as a type that Discord accepts for slash commands." - ) - choices = choices or [] - choices = [ - choice if isinstance(choice, dict) else {"name": choice, "value": choice} - for choice in choices - ] - return { - "name": name, - "description": description, - "type": option_type, - "required": required, - "choices": choices, - } - - -def generate_options( - function: Callable, description: str = "No description.", connector: dict = None -) -> list: - """ - Generates a list of options from the type hints of a command. - You currently can type hint: str, int, bool, discord.User, discord.Channel, discord.Role - - .. warning:: - This is automatically used if you do not pass any options directly. It is not recommended to use this. - - :param function: The function callable of the command. - :param description: The default argument description. - :param connector: Kwargs connector of the command. - """ - options = [] - if connector: - connector = {y: x for x, y in connector.items()} # Flip connector. - params = iter(inspect.signature(function).parameters.values()) - if next(params).name in ("self", "cls"): - # Skip 1. (+ 2.) parameter, self/cls and ctx - next(params) - - for param in params: - required = True - if isinstance(param.annotation, str): - # if from __future__ import annotations, then annotations are strings and should be converted back to types - param = param.replace(annotation=eval(param.annotation, function.__globals__)) - - if param.default is not inspect._empty: - required = False - elif getattr(param.annotation, "__origin__", None) is typing.Union: - # Make a command argument optional with typing.Optional[type] or typing.Union[type, None] - args = getattr(param.annotation, "__args__", None) - if args: - param = param.replace(annotation=args[0]) - required = not isinstance(args[-1], type(None)) - - option_type = ( - SlashCommandOptionType.from_type(param.annotation) or SlashCommandOptionType.STRING - ) - name = param.name if not connector else connector[param.name] - options.append(create_option(name, description or "No Description.", option_type, required)) - - return options - - -def create_choice(value: Union[str, int], name: str): - """ - Creates choices used for creating command option. - - :param value: Value of the choice. - :param name: Name of the choice. - :return: dict - """ - return {"value": value, "name": name} - - -def create_permission( - id: int, id_type: typing.Union[int, SlashCommandPermissionType], permission: bool -): - """ - Create a single command permission. - - :param id: Target id to apply the permission on. - :param id_type: Type of the id, :class:`..model.SlashCommandPermissionsType`. - :param permission: State of the permission. ``True`` to allow access, ``False`` to disallow access. - :return: dict - - .. note:: - For @everyone permission, set id_type as role and id as guild id. - """ - if not ( - isinstance(id_type, int) or isinstance(id_type, bool) - ): # Bool values are a subclass of int - original_type = id_type - id_type = SlashCommandPermissionType.from_type(original_type) - if id_type is None: - raise IncorrectType( - f"The type {original_type} is not recognized as a type that Discord accepts for slash command permissions." - ) - return {"id": id, "type": id_type, "permission": permission} - - -def create_multi_ids_permission( - ids: typing.List[int], id_type: typing.Union[int, SlashCommandPermissionType], permission: bool -): - """ - Creates a list of permissions from list of ids with common id_type and permission state. - - :param ids: List of target ids to apply the permission on. - :param id_type: Type of the id. - :param permission: State of the permission. ``True`` to allow access, ``False`` to disallow access. - """ - return [create_permission(id, id_type, permission) for id in set(ids)] - - -def generate_permissions( - allowed_roles: typing.List[int] = None, - allowed_users: typing.List[int] = None, - disallowed_roles: typing.List[int] = None, - disallowed_users: typing.List[int] = None, -): - """ - Creates a list of permissions. - - :param allowed_roles: List of role ids that can access command. - :param allowed_users: List of user ids that can access command. - :param disallowed_roles: List of role ids that should not access command. - :param disallowed_users: List of users ids that should not access command. - :return: list - """ - permissions = [] - - if allowed_roles: - permissions.extend( - create_multi_ids_permission(allowed_roles, SlashCommandPermissionType.ROLE, True) - ) - if allowed_users: - permissions.extend( - create_multi_ids_permission(allowed_users, SlashCommandPermissionType.USER, True) - ) - if disallowed_roles: - permissions.extend( - create_multi_ids_permission(disallowed_roles, SlashCommandPermissionType.ROLE, False) - ) - if disallowed_users: - permissions.extend( - create_multi_ids_permission(disallowed_users, SlashCommandPermissionType.USER, False) - ) - - return permissions diff --git a/discord_slash/utils/manage_components.py b/discord_slash/utils/manage_components.py deleted file mode 100644 index 82a4a787..00000000 --- a/discord_slash/utils/manage_components.py +++ /dev/null @@ -1,323 +0,0 @@ -import logging -import typing -import uuid - -import discord - -from ..context import ComponentContext -from ..error import IncorrectFormat, IncorrectType -from ..model import ButtonStyle, ComponentType - -logger = logging.getLogger("discord_slash") - - -def create_actionrow(*components: dict) -> dict: - """ - Creates an ActionRow for message components. - - :param components: Components to go within the ActionRow. - :return: dict - """ - if not components or len(components) > 5: - raise IncorrectFormat("Number of components in one row should be between 1 and 5.") - if ( - ComponentType.select in [component["type"] for component in components] - and len(components) > 1 - ): - raise IncorrectFormat("Action row must have only one select component and nothing else") - - return {"type": ComponentType.actionrow, "components": components} - - -def spread_to_rows(*components, max_in_row=5) -> typing.List[dict]: - """ - A helper function that spreads your components into ``actionrows`` of a set size - - :param components: Components dicts (buttons or selects or existing actionrows) to spread. Use `None` to explicitly start a new row. - :type components: dict - :param max_in_row: Maximum number of elements in each row. - :type max_in_row: int - :return: list - - .. note:: An action_row can only have a maximum of 5 items in it - """ - if not components or len(components) > 25: - raise IncorrectFormat("Number of components should be between 1 and 25.") - - if max_in_row < 1 or max_in_row > 5: - raise IncorrectFormat("max_in_row should be between 1 and 5.") - - rows = [] - button_row = [] - for component in list(components) + [None]: - if component is not None and component["type"] == ComponentType.button: - button_row.append(component) - - if len(button_row) == max_in_row: - rows.append(create_actionrow(*button_row)) - button_row = [] - - continue - - if button_row: - rows.append(create_actionrow(*button_row)) - button_row = [] - - if component is None: - pass - elif component["type"] == ComponentType.actionrow: - rows.append(component) - elif component["type"] == ComponentType.select: - rows.append(create_actionrow(component)) - - if len(rows) > 5: - raise IncorrectFormat("Number of rows exceeds 5.") - - return rows - - -def emoji_to_dict(emoji: typing.Union[discord.Emoji, discord.PartialEmoji, str]) -> dict: - """ - Converts a default or custom emoji into a partial emoji dict. - - :param emoji: The emoji to convert. - :type emoji: Union[discord.Emoji, discord.PartialEmoji, str] - """ - if isinstance(emoji, discord.Emoji): - emoji = {"name": emoji.name, "id": emoji.id, "animated": emoji.animated} - elif isinstance(emoji, discord.PartialEmoji): - emoji = emoji.to_dict() - elif isinstance(emoji, str): - emoji = {"name": emoji, "id": None} - return emoji if emoji else {} - - -def create_button( - style: typing.Union[ButtonStyle, int], - label: str = None, - emoji: typing.Union[discord.Emoji, discord.PartialEmoji, str] = None, - custom_id: str = None, - url: str = None, - disabled: bool = False, -) -> dict: - """ - Creates a button component for use with the ``components`` field. Must be used within an ``actionRow`` to be used (see :meth:`create_actionrow`). - - .. note:: - At least a label or emoji is required for a button. You can have both, but not neither of them. - - :param style: Style of the button. Refer to :class:`ButtonStyle`. - :type style: Union[ButtonStyle, int] - :param label: The label of the button. - :type label: Optional[str] - :param emoji: The emoji of the button. - :type emoji: Union[discord.Emoji, discord.PartialEmoji, dict] - :param custom_id: The custom_id of the button. Needed for non-link buttons. - :type custom_id: Optional[str] - :param url: The URL of the button. Needed for link buttons. - :type url: Optional[str] - :param disabled: Whether the button is disabled or not. Defaults to `False`. - :type disabled: bool - :returns: :class:`dict` - """ - if style == ButtonStyle.URL: - if custom_id: - raise IncorrectFormat("A link button cannot have a `custom_id`!") - if not url: - raise IncorrectFormat("A link button must have a `url`!") - elif url: - raise IncorrectFormat("You can't have a URL on a non-link button!") - - if not label and not emoji: - raise IncorrectFormat("You must have at least a label or emoji on a button.") - - if custom_id is not None and not isinstance(custom_id, str): - custom_id = str(custom_id) - logger.warning( - "Custom_id has been automatically converted to a string. Please use strings in future\n" - "Note: Discord will always return custom_id as a string" - ) - - emoji = emoji_to_dict(emoji) - - data = { - "type": ComponentType.button, - "style": style, - } - - if label: - data["label"] = label - if emoji: - data["emoji"] = emoji - if disabled: - data["disabled"] = disabled - - if style == ButtonStyle.URL: - data["url"] = url - else: - data["custom_id"] = custom_id or str(uuid.uuid4()) - - return data - - -def create_select_option( - label: str, value: str, emoji=None, description: str = None, default: bool = False -): - """ - Creates an option for select components. - - :param label: The user-facing name of the option that will be displayed in discord client. - :param value: The value that the bot will receive when this option is selected. - :param emoji: The emoji of the option. - :param description: An additional description of the option. - :param default: Whether or not this is the default option. - """ - emoji = emoji_to_dict(emoji) - - if not len(label) or len(label) > 100: - raise IncorrectFormat("Label length should be between 1 and 100.") - if not isinstance(value, str): - value = str(value) - logger.warning( - "Value has been automatically converted to a string. Please use strings in future\n" - "Note: Discord will always return value as a string" - ) - if not len(value) or len(value) > 100: - raise IncorrectFormat("Value length should be between 1 and 100.") - if description is not None and len(description) > 100: - raise IncorrectFormat("Description length must be 100 or lower.") - - return { - "label": label, - "value": value, - "description": description, - "default": default, - "emoji": emoji, - } - - -def create_select( - options: typing.List[dict], - custom_id: str = None, - placeholder: typing.Optional[str] = None, - min_values: typing.Optional[int] = None, - max_values: typing.Optional[int] = None, - disabled: bool = False, -): - """ - Creates a select (dropdown) component for use with the ``components`` field. Must be inside an ActionRow to be used (see :meth:`create_actionrow`). - - :param options: The choices the user can pick from - :param custom_id: A custom identifier, like buttons - :param placeholder: Custom placeholder text if nothing is selected - :param min_values: The minimum number of items that **must** be chosen - :param max_values: The maximum number of items that **can** be chosen - :param disabled: Disables this component. Defaults to ``False``. - """ - if not len(options) or len(options) > 25: - raise IncorrectFormat("Options length should be between 1 and 25.") - - return { - "type": ComponentType.select, - "options": options, - "custom_id": custom_id or str(uuid.uuid4()), - "placeholder": placeholder or "", - "min_values": min_values, - "max_values": max_values, - "disabled": disabled, - } - - -def get_components_ids(component: typing.Union[str, dict, list]) -> typing.Iterator[str]: - """ - Returns generator with the ``custom_id`` of a component or list of components. - - :param component: Custom ID or component dict (actionrow or button) or list of the two. - :returns: typing.Iterator[str] - """ - - if isinstance(component, str): - yield component - elif isinstance(component, dict): - if component["type"] == ComponentType.actionrow: - yield from ( - comp["custom_id"] for comp in component["components"] if "custom_id" in comp - ) - elif "custom_id" in component: - yield component["custom_id"] - elif isinstance(component, list): - # Either list of components (actionrows or buttons) or list of ids - yield from (comp_id for comp in component for comp_id in get_components_ids(comp)) - else: - raise IncorrectType( - f"Unknown component type of {component} ({type(component)}). " - f"Expected str, dict or list" - ) - - -def get_messages_ids(message: typing.Union[int, discord.Message, list]) -> typing.Iterator[int]: - """ - Returns generator with the ``id`` of message or list messages. - - :param message: message ID or message object or list of previous two. - :returns: typing.Iterator[int] - """ - if isinstance(message, int): - yield message - elif isinstance(message, discord.Message): - yield message.id - elif isinstance(message, list): - yield from (msg_id for msg in message for msg_id in get_messages_ids(msg)) - else: - raise IncorrectType( - f"Unknown component type of {message} ({type(message)}). " - f"Expected discord.Message, int or list" - ) - - -async def wait_for_component( - client: discord.Client, - messages: typing.Union[discord.Message, int, list] = None, - components: typing.Union[str, dict, list] = None, - check=None, - timeout=None, -) -> ComponentContext: - """ - Helper function - wrapper around 'client.wait_for("component", ...)' - - Waits for a component interaction. Only accepts interactions based on the custom ID of the component or/and message ID, and optionally a check function. - - :param client: The client/bot object. - :type client: :class:`discord.Client` - :param messages: The message object to check for, or the message ID or list of previous two. - :type messages: Union[discord.Message, int, list] - :param components: Custom ID to check for, or component dict (actionrow or button) or list of previous two. - :type components: Union[str, dict, list] - :param check: Optional check function. Must take a `ComponentContext` as the first parameter. - :param timeout: The number of seconds to wait before timing out and raising :exc:`asyncio.TimeoutError`. - :raises: :exc:`asyncio.TimeoutError` - """ - - if not (messages or components): - raise IncorrectFormat("You must specify messages or components (or both)") - - message_ids = list(get_messages_ids(messages)) if messages else None - custom_ids = list(get_components_ids(components)) if components else None - - # automatically convert improper custom_ids - if custom_ids and not all(isinstance(x, str) for x in custom_ids): - custom_ids = [str(i) for i in custom_ids] - logger.warning( - "Custom_ids have been automatically converted to a list of strings. Please use lists of strings in future.\n" - "Note: Discord will always return custom_ids as strings" - ) - - def _check(ctx: ComponentContext): - if check and not check(ctx): - return False - # if custom_ids is empty or there is a match - wanted_message = not message_ids or ctx.origin_message_id in message_ids - wanted_component = not custom_ids or ctx.custom_id in custom_ids - return wanted_message and wanted_component - - return await client.wait_for("component", check=_check, timeout=timeout)