diff --git a/src/hermes/commands/init/base.py b/src/hermes/commands/init/base.py index aa5d63eb..5937eded 100644 --- a/src/hermes/commands/init/base.py +++ b/src/hermes/commands/init/base.py @@ -6,7 +6,9 @@ import logging import os import re +import shutil import sys +import traceback from dataclasses import dataclass from enum import Enum, auto from importlib import metadata @@ -19,6 +21,7 @@ from requests import HTTPError import hermes.commands.init.util.slim_click as sc +from hermes.commands import marketplace from hermes.commands.base import HermesCommand, HermesPlugin from hermes.commands.init.util import (connect_github, connect_gitlab, connect_zenodo, git_info) @@ -121,7 +124,10 @@ def string_in_file(file_path, search_string: str) -> bool: def get_builtin_plugins(plugin_commands: list[str]) -> dict[str: HermesPlugin]: - """Returns a list of installed HermesPlugins based on a list of related command names.""" + """ + Returns a list of installed HermesPlugins based on a list of related command names. + This is currently not used (we use the marketplace code instead) but maybe later. + """ plugins = {} for plugin_command_name in plugin_commands: entry_point_group = f"hermes.{plugin_command_name}" @@ -156,6 +162,8 @@ class HermesInitCommand(HermesCommand): def __init__(self, parser: argparse.ArgumentParser): super().__init__(parser) self.folder_info: HermesInitFolderInfo = HermesInitFolderInfo() + self.hermes_was_already_installed: bool = False + self.new_created_paths: list[Path] = [] self.tokens: dict = {} self.setup_method: str = "" self.deposit_platform: DepositPlatform = DepositPlatform.Empty @@ -173,8 +181,21 @@ def __init__(self, parser: argparse.ArgumentParser): "deposit_extra_files": "", "push_branch": "main" } + self.hermes_toml_data = { + "harvest": { + "sources": ["cff"] + }, + "deposit": { + "target": "invenio_rdm", + "invenio_rdm": { + "site_url": "", + "access_right": "open" + } + } + } self.plugin_relevant_commands = ["harvest", "deposit"] self.builtin_plugins: dict[str: HermesPlugin] = get_builtin_plugins(self.plugin_relevant_commands) + self.selected_plugins: list[marketplace.PluginInfo] = [] def init_command_parser(self, command_parser: argparse.ArgumentParser) -> None: command_parser.add_argument('--template-branch', nargs=1, default="", @@ -208,37 +229,60 @@ def __call__(self, args: argparse.Namespace) -> None: if args.template_branch != "": self.template_branch = args.template_branch - # Test if init is valid in current folder - self.test_initialization() + try: + # Test if init is valid in current folder + self.test_initialization() + + sc.echo(f"Starting to initialize HERMES in {self.folder_info.absolute_path}\n") + sc.max_steps = 8 + + sc.next_step("Configure HERMES plugins") + self.choose_plugins() + self.integrate_plugins() - sc.echo(f"Starting to initialize HERMES in {self.folder_info.absolute_path}") - sc.max_steps = 7 + sc.next_step("Configure deposition platform and setup method") + self.choose_deposit_platform() + self.integrate_deposit_platform() + self.choose_setup_method() - sc.next_step("Configure deposition platform and setup method") - self.choose_deposit_platform() - self.choose_setup_method() + sc.next_step("Configure HERMES behaviour") + self.choose_push_branch() + self.choose_deposit_files() + + sc.next_step("Create hermes.toml file") + self.create_hermes_toml() - sc.next_step("Configure HERMES behaviour") - self.choose_push_branch() - self.choose_deposit_files() + sc.next_step("Create CITATION.cff file") + self.create_citation_cff() - sc.next_step("Create hermes.toml file") - self.create_hermes_toml() + sc.next_step("Create git CI files") + self.update_gitignore() + self.create_ci_template() - sc.next_step("Create CITATION.cff file") - self.create_citation_cff() + sc.next_step("Connect with deposition platform") + self.connect_deposit_platform() - sc.next_step("Create git CI files") - self.update_gitignore() - self.create_ci_template() + sc.next_step("Connect with git hoster") + self.configure_git_project() - sc.next_step("Connect with deposition platform") - self.connect_deposit_platform() + self.clean_up_files(False) + sc.echo("\nHERMES is now initialized and ready to be used.\n", + formatting=sc.Formats.OKGREEN+sc.Formats.BOLD) - sc.next_step("Connect with git hoster") - self.configure_git_project() + # Nice message on Ctrl+C + except KeyboardInterrupt: + sc.echo("") + sc.echo("HERMES init was aborted.", sc.Formats.WARNING) + self.clean_up_files(True) + sys.exit() - sc.echo("\nHERMES is now initialized and ready to be used.\n", formatting=sc.Formats.OKGREEN+sc.Formats.BOLD) + # Useful message on error + except Exception as e: + sc.echo(f"An error occurred during execution of HERMES init: {e}", + formatting=sc.Formats.FAIL+sc.Formats.BOLD) + sc.debug_info(traceback.format_exc()) + self.clean_up_files(True) + sys.exit(2) def test_initialization(self) -> None: """Test if init is possible and wanted. If not: sys.exit()""" @@ -253,6 +297,7 @@ def test_initialization(self) -> None: # Look at the current folder self.refresh_folder_info() + self.hermes_was_already_installed = self.folder_info.has_hermes_toml # Abort if there is no git if not self.folder_info.has_git_folder: @@ -279,6 +324,7 @@ def test_initialization(self) -> None: if self.git_remote: self.git_remote_url = git_info.get_remote_url(self.git_remote) self.git_hoster = get_git_hoster_from_url(self.git_remote_url) + # Abort with no remote else: sc.echo("Your git project does not have a remote. It is recommended for HERMES to " @@ -306,26 +352,14 @@ def test_initialization(self) -> None: sys.exit() def create_hermes_toml(self) -> None: - """Creates the hermes.toml file based on a dictionary""" - deposit_url = DepositPlatformUrls.get(self.deposit_platform) - default_values = { - "harvest": { - "sources": ["cff"] - }, - "deposit": { - "target": "invenio_rdm", - "invenio_rdm": { - "site_url": deposit_url, - "access_right": "open" - } - } - } - + """Creates the hermes.toml file based on a self.hermes_toml_data""" + hermes_toml_path = Path("hermes.toml") + self.mark_as_new_path(hermes_toml_path) if (not self.folder_info.has_hermes_toml) \ or sc.confirm("Do you want to replace your `hermes.toml` with a new one?", default=True): - with open('hermes.toml', 'w') as toml_file: + with open(hermes_toml_path, 'w') as toml_file: # noinspection PyTypeChecker - toml.dump(default_values, toml_file) + toml.dump(self.hermes_toml_data, toml_file) sc.echo("`hermes.toml` was created.", formatting=sc.Formats.OKGREEN) def create_citation_cff(self) -> None: @@ -354,17 +388,19 @@ def create_citation_cff(self) -> None: def update_gitignore(self) -> None: """Creates .gitignore if there is none and adds '.hermes' to it""" + gitignore_path = Path(".gitignore") + self.mark_as_new_path(gitignore_path) if not self.folder_info.has_gitignore: - open(".gitignore", 'w') + open(gitignore_path, 'w') sc.echo("A new `.gitignore` file was created.", formatting=sc.Formats.OKGREEN) self.refresh_folder_info() if self.folder_info.has_gitignore: - with open(".gitignore", "r") as file: + with open(gitignore_path, "r") as file: gitignore_lines = file.readlines() if any([line.startswith(".hermes") for line in gitignore_lines]): sc.echo("The `.gitignore` file already contains `.hermes/`") else: - with open(".gitignore", "a") as file: + with open(gitignore_path, "a") as file: file.write("# Ignoring all HERMES cache files\n") file.write(".hermes/\n") file.write("hermes.log\n") @@ -379,12 +415,16 @@ def create_ci_template(self) -> None: """Downloads and configures the ci workflow files using templates from the chosen template branch.""" match self.git_hoster: case GitHoster.GitHub: - # TODO Replace this later with the link to the real templates (not the feature branch) template_url = self.get_template_url("TEMPLATE_hermes_github_to_zenodo.yml") - ci_file_folder = ".github/workflows" + ci_file_folder = Path(".github/workflows") ci_file_name = "hermes_github.yml" - Path(ci_file_folder).mkdir(parents=True, exist_ok=True) - ci_file_path = Path(ci_file_folder) / ci_file_name + ci_file_path = ci_file_folder / ci_file_name + # Adding paths to our list + self.mark_as_new_path(Path(".github")) + self.mark_as_new_path(ci_file_folder) + self.mark_as_new_path(ci_file_path) + # Creating folder & ci file + ci_file_folder.mkdir(parents=True, exist_ok=True) download_file_from_url(template_url, ci_file_path) self.configure_ci_template(ci_file_path) sc.echo(f"GitHub CI: File was created at {ci_file_path}", formatting=sc.Formats.OKGREEN) @@ -392,8 +432,15 @@ def create_ci_template(self) -> None: gitlab_ci_template_url = self.get_template_url("TEMPLATE_hermes_gitlab_to_zenodo.yml") hermes_ci_template_url = self.get_template_url("hermes-ci.yml") gitlab_ci_path = Path(".gitlab-ci.yml") - Path("gitlab").mkdir(parents=True, exist_ok=True) - hermes_ci_path = Path("gitlab") / "hermes-ci.yml" + gitlab_folder_path = Path("gitlab") + hermes_ci_path = gitlab_folder_path / "hermes-ci.yml" + # Adding paths to our list + self.mark_as_new_path(gitlab_ci_path) + self.mark_as_new_path(gitlab_folder_path) + self.mark_as_new_path(hermes_ci_path) + # Creating the gitlab folder + gitlab_folder_path.mkdir(parents=True, exist_ok=True) + # Creating / updating gitlab-ci if gitlab_ci_path.exists(): if string_in_file(gitlab_ci_path, "hermes-ci.yml"): sc.echo(f"It seems like your {gitlab_ci_path} file is already configured for hermes.") @@ -404,6 +451,7 @@ def create_ci_template(self) -> None: download_file_from_url(gitlab_ci_template_url, gitlab_ci_path) sc.echo(f"GitLab CI: {gitlab_ci_path} was created.", formatting=sc.Formats.OKGREEN) self.configure_ci_template(gitlab_ci_path) + # Creating hermes-ci download_file_from_url(hermes_ci_template_url, hermes_ci_path) self.configure_ci_template(hermes_ci_path) @@ -424,10 +472,11 @@ def configure_ci_template(self, ci_file_path) -> None: parameters = list(set(re.findall(r'{%(.*?)%}', content))) for parameter in parameters: if parameter in self.ci_parameters: - content = content.replace(f'{{%{parameter}%}}', self.ci_parameters[parameter]) + value = str(self.ci_parameters[parameter]) + content = content.replace(f'{{%{parameter}%}}', value) else: - sc.echo(f"Warning: CI File Parameter {{%{parameter}%}} was not set.", - formatting=sc.Formats.WARNING) + sc.debug_info(f"CI File Parameter {{%{parameter}%}} was not set.", formatting=sc.Formats.WARNING) + content = content.replace(f'{{%{parameter}%}}', '') with open(ci_file_path, 'w') as file: file.write(content) @@ -567,6 +616,11 @@ def choose_deposit_platform(self) -> None: ) self.deposit_platform = deposit_platform_list[deposit_platform_index] + def integrate_deposit_platform(self) -> None: + """Makes changes to the toml data or something else based on the chosen deposit platform.""" + deposit_url = DepositPlatformUrls.get(self.deposit_platform) + self.hermes_toml_data["deposit"]["invenio_rdm"]["site_url"] = deposit_url + def choose_setup_method(self) -> None: """User chooses his desired setup method: Either preferring automatic (if available) or manual.""" setup_method_index = sc.choose( @@ -590,6 +644,62 @@ def connect_deposit_platform(self) -> None: connect_zenodo.setup(using_sandbox=True) self.create_zenodo_token() + def choose_plugins(self) -> None: + """User chooses the plugins he wants to use.""" + plugin_infos: list[marketplace.PluginInfo] = marketplace.get_plugin_infos() + plugins_builtin: list[marketplace.PluginInfo] = list(filter(lambda p: p.builtin, plugin_infos)) + plugins_available: list[marketplace.PluginInfo] = list(filter(lambda p: not p.builtin, plugin_infos)) + plugins_selected: list[marketplace.PluginInfo] = [] + sc.echo("The following plugins are already builtin:") + for info in plugins_builtin: + sc.echo(str(info), formatting=sc.Formats.OKGREEN) + sc.echo("") + while True: + if plugins_selected: + sc.echo("The following plugins are going to be installed:") + for info in plugins_selected: + sc.echo(str(info), formatting=sc.Formats.OKCYAN) + sc.echo("") + if plugins_available: + sc.echo("The following plugins are available for installation:") + for info in plugins_available: + sc.echo(str(info), formatting=sc.Formats.WARNING, no_log=True) + if info.abstract: + sc.echo("-> " + info.abstract, formatting=sc.Formats.ITALIC+sc.Formats.WARNING, no_log=True) + sc.echo("") + else: + self.selected_plugins = plugins_selected + break + no_text = "No further plugins needed" + choice = sc.choose("Do you want to add a plugin?", + [no_text] + [f"Add {p.name}" for p in plugins_available]) + if choice == 0: + self.selected_plugins = plugins_selected + break + else: + chosen_plugin = plugins_available.pop(choice - 1) + plugins_selected.append(chosen_plugin) + + def integrate_plugins(self) -> None: + """ + Plugin installation is added to the ci-parameters. + Also for now we use this method to do custom plugin installation steps. + """ + for plugin_info in self.selected_plugins: + if not plugin_info.is_valid(): + sc.echo(f"Could not install plugin: {plugin_info.name}", formatting=sc.Formats.FAIL) + continue + pip_install = plugin_info.get_pip_install_command() + self.ci_parameters["pip_install_plugins_github"] = \ + self.ci_parameters.get("pip_install_plugins_github", "") + " - run: " + pip_install + "\n" + self.ci_parameters["pip_install_plugins_gitlab"] = \ + self.ci_parameters.get("pip_install_plugins_gitlab", "") + " - " + pip_install + "\n" + match plugin_info.name: + case "hermes-plugin-python": + self.hermes_toml_data["harvest"]["sources"].append("toml") + case "hermes-plugin-git": + self.hermes_toml_data["harvest"]["sources"].append("git") + def no_git_setup(self, start_question: str = "") -> None: """Makes the init for a gitless project (basically just creating hermes.toml)""" if start_question == "": @@ -604,6 +714,7 @@ def no_git_setup(self, start_question: str = "") -> None: sc.next_step("Create CITATION.cff file") self.create_citation_cff() + self.clean_up_files(False) sc.echo("\nHERMES is now initialized (without git integration or CI/CD files).\n", formatting=sc.Formats.OKGREEN) @@ -617,7 +728,11 @@ def choose_push_branch(self) -> None: ] ) if push_choice == 0: - self.ci_parameters["push_branch"] = sc.answer("Enter target branch: ") + branch = sc.answer("Enter target branch: ") + self.ci_parameters["push_branch"] = branch + sc.echo(f"The HERMES pipeline will be activated when you push on {sc.Formats.BOLD.wrap_around(branch)}", + formatting=sc.Formats.OKGREEN) + sc.echo() elif push_choice == 1: sc.echo("Setting up triggering by tags is currently not implemented.", formatting=sc.Formats.WARNING) sc.echo(f"You can visit {TUTORIAL_URL} to set it up manually later-on.", formatting=sc.Formats.WARNING) @@ -673,3 +788,31 @@ def choose_deposit_files(self) -> None: else: for file in self.folder_info.dir_list: sc.echo(f"\t\t{file}", formatting=sc.Formats.OKCYAN) + + def mark_as_new_path(self, path: Path, avoid_existing: bool = True) -> None: + """ + This method should be called directly BEFORE creating a new file in the given Path. + This way we can look if something already exists there to decide later-on if we want to delete it on abort. + """ + if (not avoid_existing) or (not path.exists()): + self.new_created_paths.append(path) + + def clean_up_files(self, aborted: bool) -> None: + """ + This gets called when init is finished (successfully or aborted). + It cleans up unwanted files (like .hermes folder) and everything new when aborted. + """ + sc.echo("Cleaning unused files...") + hidden_hermes_path = Path(".hermes") + if hidden_hermes_path.exists() and hidden_hermes_path.is_dir(): + shutil.rmtree(hidden_hermes_path) + if aborted: + if not self.hermes_was_already_installed: + for path in reversed(self.new_created_paths): + try: + if path.is_dir(): + path.rmdir() + else: + os.remove(path) + except Exception as e: + sc.echo(f"Cleaning Warning: Could not remove {path}. ({e})") diff --git a/src/hermes/commands/init/util/slim_click.py b/src/hermes/commands/init/util/slim_click.py index 5b705a41..2f72626f 100644 --- a/src/hermes/commands/init/util/slim_click.py +++ b/src/hermes/commands/init/util/slim_click.py @@ -61,22 +61,28 @@ def get_log_type(self, default: int = logging.INFO) -> int: return logging.INFO return default + def wrap_around(self, text: str) -> str: + return self.get_ansi() + text + Formats.ENDC.get_ansi() -def echo(text: str, formatting: Formats = Formats.EMPTY, log_as: int = logging.NOTSET): + +def echo(text: str = "", formatting: Formats = Formats.EMPTY, log_as: int = logging.NOTSET, no_log: bool = False): """ Prints the text with the given formatting. If log_as is set or AUTO_LOG_ON_ECHO is true it gets logged as well. :param text: The printed text. :param formatting: You can use the Formats Enum to give the text a special color or formatting. :param log_as: Creates a log entry with the given text if this is set. + :param no_log: Never creates a log entry if True. """ # Get logging type from formatting if AUTO_LOG_ON_ECHO if AUTO_LOG_ON_ECHO and log_as == logging.NOTSET and text != "": log_as = formatting.get_log_type(logging.INFO) # Add text to log if there is a logger - if log_as != logging.NOTSET and default_file_logger: + if log_as != logging.NOTSET and default_file_logger and not no_log: default_file_logger.log(log_as, text) # Format the text for the console if formatting != Formats.EMPTY: + if Formats.ENDC.get_ansi() in text: + text = text.replace(Formats.ENDC.get_ansi(), Formats.ENDC.get_ansi() + formatting.get_ansi()) text = f"{formatting.get_ansi()}{text}{Formats.ENDC.get_ansi()}" # Print it if (log_as != logging.DEBUG) or PRINT_DEBUG: @@ -164,7 +170,9 @@ def next_step(description: str): def create_console_hyperlink(url: str, word: str) -> str: """Use this to have a consistent display of hyperlinks.""" - return f"\033]8;;{url}\033\\{word}\033]8;;\033\\" if USE_FANCY_HYPERLINKS else f"{word} ({url})" + if USE_FANCY_HYPERLINKS: + return f"\033]8;;{url}\033\\{word}\033]8;;\033\\" + return f"{word} ({url})" class ColorLogHandler(logging.Handler): diff --git a/src/hermes/commands/marketplace.py b/src/hermes/commands/marketplace.py index cd7ddc29..680158a3 100644 --- a/src/hermes/commands/marketplace.py +++ b/src/hermes/commands/marketplace.py @@ -13,6 +13,7 @@ from pydantic import BaseModel, ConfigDict, Field from pydantic.alias_generators import to_camel +from hermes.commands.init.util import slim_click from hermes.utils import hermes_doi, hermes_concept_doi, hermes_user_agent MARKETPLACE_URL = "https://hermes.software-metadata.pub/marketplace" @@ -95,6 +96,11 @@ def handle_data(self, data): plugin = SchemaOrgSoftwareApplication.model_validate_json(data) self.plugins.append(plugin) + def parse_plugins_from_url(self, url: str = MARKETPLACE_URL, user_agent: str = hermes_user_agent): + response = requests.get(url, headers={"User-Agent": user_agent}) + response.raise_for_status() + self.feed(response.text) + @cache def _doi_is_version_of_concept_doi(doi: str, concept_doi: str) -> bool: @@ -148,25 +154,23 @@ def _sort_plugins_by_step(plugins: list[SchemaOrgSoftwareApplication]) -> dict[s return sorted_plugins -def main(): - response = requests.get(MARKETPLACE_URL, headers={"User-Agent": hermes_user_agent}) - response.raise_for_status() +def _plugin_loc(_plugin: SchemaOrgSoftwareApplication) -> str: + return ( + "builtin" + if _is_hermes_reference(_plugin.is_part_of) + else (_plugin.url or "") + ) + +def main(): parser = PluginMarketPlaceParser() - parser.feed(response.text) + parser.parse_plugins_from_url(MARKETPLACE_URL, hermes_user_agent) print( "A detailed list of available plugins can be found on the HERMES website at", MARKETPLACE_URL + "." ) - def _plugin_loc(_plugin: SchemaOrgSoftwareApplication) -> str: - return ( - "builtin" - if _is_hermes_reference(_plugin.is_part_of) - else (_plugin.url or "") - ) - if parser.plugins: print() max_name_len = max(map(lambda plugin: len(plugin.name), parser.plugins)) @@ -184,3 +188,63 @@ def _plugin_loc(_plugin: SchemaOrgSoftwareApplication) -> str: if __name__ == "__main__": main() + + +class PluginInfo: + """ + This class contains all the information about a plugin which are needed for the init-Command. + """ + def __init__(self): + self.name: str = "" + self.location: str = "" + self.step: str = "" + self.builtin: bool = True + self.install_url: str = "" + self.abstract: str = "" + + def __str__(self): + step_text = f"[{self.step}]" + return f"{step_text} {slim_click.Formats.BOLD.wrap_around(self.name)} ({self.location})" + + def get_pip_install_command(self) -> str: + """ + Returns the pip install command which can be used to install the plugin. + Tries to extract the project name from the install_url (PyPI-URL) if possible. + Otherwise, it tries to use the location (Git-Project-URL) for the pip install command. + """ + if self.install_url and self.install_url.startswith("https://pypi.org/project/"): + project_name = self.install_url.rstrip("/").removeprefix("https://pypi.org/project/") + return f"pip install {project_name}" + if self.location and self.location.startswith(("https://", "git@", "ssh://")): + git_url = self.location.rstrip("/") + return f"pip install git+{git_url}" + return "" + + def is_valid(self) -> bool: + """ + Returns True if the plugin can be installed. Maybe we'll check the actual repository here later + to make sure that other things are valid too. + """ + return self.get_pip_install_command() != "" + + +def get_plugin_infos() -> list[PluginInfo]: + """ + Returns a List of PluginInfos which are meant to be used by the init-command. + """ + parser = PluginMarketPlaceParser() + parser.parse_plugins_from_url(MARKETPLACE_URL, hermes_user_agent) + infos: list[PluginInfo] = [] + if parser.plugins: + plugins_sorted = _sort_plugins_by_step(parser.plugins) + for step in plugins_sorted.keys(): + for plugin in plugins_sorted[step]: + info = PluginInfo() + info.name = plugin.name + info.step = step + info.location = _plugin_loc(plugin) + info.builtin = info.location == "builtin" + info.install_url = plugin.install_url + info.abstract = plugin.abstract + infos.append(info) + return infos