diff --git a/webhook_server_container/app.py b/webhook_server_container/app.py index 024f8002..6f80ac36 100644 --- a/webhook_server_container/app.py +++ b/webhook_server_container/app.py @@ -1,12 +1,10 @@ -from typing import Any, Dict import os import sys +from typing import Any, Dict -from fastapi import Request import requests import urllib3 - -from fastapi import FastAPI +from fastapi import FastAPI, Request from webhook_server_container.libs.github_api import ProcessGithubWehook from webhook_server_container.utils.helpers import get_logger_with_params @@ -46,12 +44,13 @@ async def process_webhook(request: Request) -> Dict[str, Any]: except Exception as exp: logger.error(f"Error: {exp}") - exc_type, exc_obj, exc_tb = sys.exc_info() # noqa: F841 + exc_type, _, exc_tb = sys.exc_info() # noqa: F841 msg = f"Error: {exc_type}" if exc_tb is not None: file_name = os.path.split(exc_tb.tb_frame.f_code.co_filename) msg = f"Error: {exc_type}, File: {file_name}, Line: {exc_tb.tb_lineno}" + logger.error(msg) return { "status": requests.codes.server_error, diff --git a/webhook_server_container/libs/github_api.py b/webhook_server_container/libs/github_api.py index 8e5b6e35..1ca27fe5 100644 --- a/webhook_server_container/libs/github_api.py +++ b/webhook_server_container/libs/github_api.py @@ -10,7 +10,7 @@ import time from concurrent.futures import Future, ThreadPoolExecutor, as_completed from pathlib import Path -from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple +from typing import Any, Callable, Generator from uuid import uuid4 import requests @@ -22,6 +22,7 @@ from github.Commit import Commit from github.GithubException import UnknownObjectException from github.PullRequest import PullRequest +from github.Repository import Repository from starlette.datastructures import Headers from stringcolor import cs from timeout_sampler import TimeoutExpiredError, TimeoutSampler @@ -90,32 +91,36 @@ class RepositoryNotFoundError(Exception): class ProcessGithubWehookError(Exception): - def __init__(self, err: Dict[str, str]): + def __init__(self, err: dict[str, str]): self.err = err def __str__(self) -> str: return f"{self.err}" +class RepositoryNotInitializedError(Exception): + pass + + class ProcessGithubWehook: - def __init__(self, hook_data: Dict[Any, Any], headers: Headers, logger: logging.Logger) -> None: + def __init__(self, hook_data: dict[Any, Any], headers: Headers, logger: logging.Logger) -> None: self.logger = logger self.logger.name = "ProcessGithubWehook" self.hook_data = hook_data self.headers = headers + self.repository: Repository | None = None self.repository_name: str = hook_data["repository"]["name"] + self.repository_full_name: str = self.hook_data["repository"]["full_name"] self.parent_committer: str = "" self.jira_track_pr: bool = False self.issue_title: str = "" - self.all_required_status_checks: List[str] = [] + self.all_required_status_checks: list[str] = [] self.x_github_delivery: str = self.headers.get("X-GitHub-Delivery", "") self.github_event: str = self.headers["X-GitHub-Event"] - self.owners_content: Dict[str, Any] = {} + self.owners_content: dict[str, Any] = {} self.config = Config() self.log_prefix = self.prepare_log_prefix() - self._repo_data_from_config() - self.github_app_api = get_repository_github_app_api( config_=self.config, repository_name=self.repository_full_name ) @@ -129,6 +134,8 @@ def __init__(self, hook_data: Dict[Any, Any], headers: Headers, logger: logging. ) return + self._repo_data_from_config() + self.github_api, self.token = get_api_with_highest_rate_limit( config=self.config, repository_name=self.repository_name ) @@ -140,6 +147,10 @@ def __init__(self, hook_data: Dict[Any, Any], headers: Headers, logger: logging. self.logger.error(f"{self.log_prefix} Failed to get GitHub API and token.") return + with contextlib.suppress(NoPullRequestError): + self.pull_request = self._get_pull_request() + self.pull_request_branch = self.pull_request.base.ref + self.repository_by_github_app = get_github_repo_api( github_api=self.github_app_api, repository=self.repository_full_name ) @@ -198,7 +209,9 @@ def process(self) -> None: event_log: str = f"Event type: {self.github_event}. event ID: {self.x_github_delivery}" try: - self.pull_request = self._get_pull_request() + if not self.pull_request: + self.pull_request = self._get_pull_request() + self.log_prefix = self.prepare_log_prefix(pull_request=self.pull_request) self.logger.debug(f"{self.log_prefix} {event_log}") self.last_commit = self._get_last_commit() @@ -259,7 +272,7 @@ def add_api_users_to_auto_verified_and_merged_users(self) -> None: self.auto_verified_and_merged_users.extend([_api[0].get_user().login for _api in apis_and_tokens]) def _get_reposiroty_color_for_log_prefix(self) -> str: - def _get_random_color(_colors: List[str], _json: Dict[str, str]) -> str: + def _get_random_color(_colors: list[str], _json: dict[str, str]) -> str: color = random.choice(_colors) _json[self.repository_name] = color @@ -268,8 +281,8 @@ def _get_random_color(_colors: List[str], _json: Dict[str, str]) -> str: return self.repository_name - _all_colors: List[str] = [] - color_json: Dict[str, str] + _all_colors: list[str] = [] + color_json: dict[str, str] _colors_to_exclude = ("blue", "white", "black", "grey") color_file: str = os.path.join(self.config.data_dir, "log-colors.json") @@ -316,7 +329,10 @@ def prepare_log_prefix(self, pull_request: PullRequest | None = None) -> str: ) def process_pull_request_check_run_webhook_data(self) -> None: - _check_run: Dict[str, Any] = self.hook_data["check_run"] + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + + _check_run: dict[str, Any] = self.hook_data["check_run"] check_run_name: str = _check_run["name"] if self.hook_data.get("action", "") != "completed": @@ -351,36 +367,52 @@ def process_pull_request_check_run_webhook_data(self) -> None: self.logger.error(f"{self.log_prefix} No pull request found") def _repo_data_from_config(self) -> None: + local_repo_dict: dict[str, Any] = {} config_data = self.config.data # Global repositories configuration repo_data = self.config.repository_data( repository_name=self.repository_name ) # Specific repository configuration - if not repo_data: + if self.github_app_api and hasattr(self, "pull_request"): + repository = self.github_app_api.get_repo(self.repository_full_name) + try: + self.logger.debug(f"{self.log_prefix} getting repository config from {self.pull_request_branch} branch") + content = repository.get_contents(".gethub-webhok_server.config", ref=self.pull_request_branch) + if isinstance(content, list): + content = content[0] + + local_repo_data: str = content.decoded_content.decode() + local_repo_dict = yaml.safe_load(local_repo_data) + except Exception: + pass + + if not repo_data or local_repo_dict: raise RepositoryNotFoundError(f"Repository {self.repository_name} not found in config file") - self.repository_full_name: str = repo_data["name"] - self.github_app_id: str = get_value_from_dicts( - primary_dict=repo_data, secondary_dict=config_data, key="github-app-id" - ) - self.pypi: Dict[str, str] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="pypi") + # self.github_app_id: str = get_value_from_dicts( + # primary_dict=repo_data, secondary_dict=config_data, key="github-app-id" + # ) + dicts = { + "primary_dict": local_repo_dict, + "secondary_dict": repo_data, + "third_dict": config_data, + } + self.pypi: dict[str, str] = get_value_from_dicts(**dicts, key="pypi") self.verified_job: bool = get_value_from_dicts( - primary_dict=repo_data, - secondary_dict=config_data, + **dicts, key="verified-job", return_on_none=True, ) - self.tox: Dict[str, str] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="tox") + self.tox: dict[str, str] = get_value_from_dicts(**dicts, key="tox") self.tox_python_version: str = get_value_from_dicts( - primary_dict=repo_data, - secondary_dict=config_data, + **dicts, key="tox-python-version", return_on_none=None, ) - self.slack_webhook_url: str = get_value_from_dicts( - primary_dict=repo_data, secondary_dict=config_data, key="slack_webhook_url" + self.slack_webhook_url: str = get_value_from_dicts(**dicts, key="slack_webhook_url") + self.build_and_push_container: dict[str, Any] = get_value_from_dicts( + **dicts, key="container", return_on_none={} ) - self.build_and_push_container: Dict[str, Any] = repo_data.get("container", {}) if self.build_and_push_container: self.container_repository_username: str = self.build_and_push_container["username"] self.container_repository_password: str = self.build_and_push_container["password"] @@ -392,23 +424,20 @@ def _repo_data_from_config(self) -> None: self.container_release: bool = self.build_and_push_container.get("release", False) self.pre_commit: bool = get_value_from_dicts( - primary_dict=repo_data, - secondary_dict=config_data, + **dicts, key="pre-commit", return_on_none=False, ) self.jira_enabled_repository: bool = False - self.jira_tracking: bool = get_value_from_dicts( - primary_dict=repo_data, secondary_dict=config_data, key="jira-tracking" - ) - self.jira: Dict[str, Any] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="jira") + self.jira_tracking: bool = get_value_from_dicts(**dicts, key="jira-tracking") + self.jira: dict[str, Any] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="jira") if self.jira_tracking and self.jira: self.jira_server: str = self.jira["server"] self.jira_project: str = self.jira["project"] self.jira_token: str = self.jira["token"] - self.jira_epic: Optional[str] = self.jira.get("epic", "") - self.jira_user_mapping: Dict[str, str] = self.jira.get("user-mapping", {}) + self.jira_epic: str = self.jira.get("epic", "") + self.jira_user_mapping: dict[str, str] = self.jira.get("user-mapping", {}) self.jira_enabled_repository = all([self.jira_server, self.jira_project, self.jira_token]) if not self.jira_enabled_repository: self.logger.error( @@ -416,23 +445,22 @@ def _repo_data_from_config(self) -> None: f"Project: {self.jira_project}, Token: {self.jira_token}" ) - self.auto_verified_and_merged_users: List[str] = get_value_from_dicts( - primary_dict=repo_data, - secondary_dict=config_data, + self.auto_verified_and_merged_users: list[str] = get_value_from_dicts( + **dicts, key="auto-verified-and-merged-users", return_on_none=[], ) self.can_be_merged_required_labels = get_value_from_dicts( - primary_dict=repo_data, - secondary_dict=config_data, + **dicts, key="can-be-merged-required-labels", return_on_none=[], ) - self.conventional_title: str = get_value_from_dicts( - primary_dict=repo_data, secondary_dict=config_data, key="conventional-title" - ) + self.conventional_title: str = get_value_from_dicts(**dicts, key="conventional-title") + + def _get_pull_request(self, number: int | None = None) -> PullRequest: + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") - def _get_pull_request(self, number: Optional[int] = None) -> PullRequest: if number: return self.repository.get_pull(number) @@ -442,7 +470,7 @@ def _get_pull_request(self, number: Optional[int] = None) -> PullRequest: except GithubException: continue - commit: Dict[str, Any] = self.hook_data.get("commit", {}) + commit: dict[str, Any] = self.hook_data.get("commit", {}) if commit: commit_obj = self.repository.get_commit(commit["sha"]) with contextlib.suppress(Exception): @@ -456,7 +484,7 @@ def _get_last_commit(self) -> Commit: def label_exists_in_pull_request(self, label: str) -> bool: return any(lb for lb in self.pull_request_labels_names() if lb == label) - def pull_request_labels_names(self) -> List[str]: + def pull_request_labels_names(self) -> list[str]: return [lb.name for lb in self.pull_request.labels] if self.pull_request else [] def skip_if_pull_request_already_merged(self) -> bool: @@ -480,6 +508,9 @@ def _remove_label(self, label: str) -> bool: return False def _add_label(self, label: str) -> None: + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + label = label.strip() if len(label) > 49: self.logger.debug(f"{label} is to long, not adding.") @@ -534,11 +565,17 @@ def _generate_issue_title(self) -> str: def _generate_issue_body(self) -> str: return f"[Auto generated]\nNumber: [#{self.pull_request.number}]" - def is_branch_exists(self, branch: str) -> Branch: + def is_branch_exists(self, branch: str) -> Branch | None: + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + return self.repository.get_branch(branch) def upload_to_pypi(self, tag_name: str) -> None: def _error(_out: str, _err: str) -> None: + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + err: str = "Publish to pypi failed" self.logger.error(f"{self.log_prefix} {err} - {_err}, {_out}") self.repository.create_issue( @@ -569,7 +606,7 @@ def _error(_out: str, _err: str) -> None: tar_gz_file = tar_gz_file.strip() - commands: List[str] = [ + commands: list[str] = [ f"uvx {uv_cmd_dir} twine check {_dist_dir}/{tar_gz_file}", f"uvx {uv_cmd_dir} twine upload --username __token__ --password {self.pypi['token']} {_dist_dir}/{tar_gz_file} --skip-existing", ] @@ -588,13 +625,13 @@ def _error(_out: str, _err: str) -> None: self.send_slack_message(message=message, webhook_url=self.slack_webhook_url) @property - def root_reviewers(self) -> List[str]: + def root_reviewers(self) -> list[str]: _reviewers = self.all_approvers_and_reviewers.get(".", {}).get("reviewers", []) self.logger.debug(f"{self.log_prefix} ROOT Reviewers: {_reviewers}") return _reviewers @property - def root_approvers(self) -> List[str]: + def root_approvers(self) -> list[str]: _approvers = self.all_approvers_and_reviewers.get(".", {}).get("approvers", []) self.logger.debug(f"{self.log_prefix} ROOT Approvers: {_approvers}") return _approvers @@ -605,7 +642,7 @@ def list_changed_files(self) -> list[str]: def assign_reviewers(self) -> None: self.logger.info(f"{self.log_prefix} Assign reviewers") - _to_add: List[str] = list(set(self.all_reviewers)) + _to_add: list[str] = list(set(self.all_reviewers)) self.logger.debug(f"{self.log_prefix} Reviewers to add: {', '.join(_to_add)}") for reviewer in _to_add: @@ -651,7 +688,10 @@ def add_size_label(self) -> None: self._add_label(label=size_label) def label_by_user_comment( - self, user_requested_label: str, remove: bool, reviewed_user: str, issue_comment_id: int + self, + user_requested_label: str, + remove: bool, + reviewed_user: str, ) -> None: self.logger.debug( f"{self.log_prefix} {DELETE_STR if remove else ADD_STR} " @@ -684,10 +724,10 @@ def set_run_tox_check_queued(self) -> None: def set_run_tox_check_in_progress(self) -> None: return self.set_check_run_status(check_run=TOX_STR, status=IN_PROGRESS_STR) - def set_run_tox_check_failure(self, output: Dict[str, Any]) -> None: + def set_run_tox_check_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=TOX_STR, conclusion=FAILURE_STR, output=output) - def set_run_tox_check_success(self, output: Dict[str, Any]) -> None: + def set_run_tox_check_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=TOX_STR, conclusion=SUCCESS_STR, output=output) def set_run_pre_commit_check_queued(self) -> None: @@ -699,13 +739,13 @@ def set_run_pre_commit_check_queued(self) -> None: def set_run_pre_commit_check_in_progress(self) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, status=IN_PROGRESS_STR) - def set_run_pre_commit_check_failure(self, output: Optional[Dict[str, Any]] = None) -> None: + def set_run_pre_commit_check_failure(self, output: dict[str, Any] | None = None) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, conclusion=FAILURE_STR, output=output) - def set_run_pre_commit_check_success(self, output: Optional[Dict[str, Any]] = None) -> None: + def set_run_pre_commit_check_success(self, output: dict[str, Any] | None = None) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, conclusion=SUCCESS_STR, output=output) - def set_merge_check_queued(self, output: Optional[Dict[str, Any]] = None) -> None: + def set_merge_check_queued(self, output: dict[str, Any] | None = None) -> None: return self.set_check_run_status(check_run=CAN_BE_MERGED_STR, status=QUEUED_STR, output=output) def set_merge_check_in_progress(self) -> None: @@ -714,7 +754,7 @@ def set_merge_check_in_progress(self) -> None: def set_merge_check_success(self) -> None: return self.set_check_run_status(check_run=CAN_BE_MERGED_STR, conclusion=SUCCESS_STR) - def set_merge_check_failure(self, output: Dict[str, Any]) -> None: + def set_merge_check_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CAN_BE_MERGED_STR, conclusion=FAILURE_STR, output=output) def set_container_build_queued(self) -> None: @@ -726,10 +766,10 @@ def set_container_build_queued(self) -> None: def set_container_build_in_progress(self) -> None: return self.set_check_run_status(check_run=BUILD_CONTAINER_STR, status=IN_PROGRESS_STR) - def set_container_build_success(self, output: Dict[str, Any]) -> None: + def set_container_build_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=BUILD_CONTAINER_STR, conclusion=SUCCESS_STR, output=output) - def set_container_build_failure(self, output: Dict[str, Any]) -> None: + def set_container_build_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=BUILD_CONTAINER_STR, conclusion=FAILURE_STR, output=output) def set_python_module_install_queued(self) -> None: @@ -741,10 +781,10 @@ def set_python_module_install_queued(self) -> None: def set_python_module_install_in_progress(self) -> None: return self.set_check_run_status(check_run=PYTHON_MODULE_INSTALL_STR, status=IN_PROGRESS_STR) - def set_python_module_install_success(self, output: Dict[str, Any]) -> None: + def set_python_module_install_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=PYTHON_MODULE_INSTALL_STR, conclusion=SUCCESS_STR, output=output) - def set_python_module_install_failure(self, output: Dict[str, Any]) -> None: + def set_python_module_install_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=PYTHON_MODULE_INSTALL_STR, conclusion=FAILURE_STR, output=output) def set_conventional_title_queued(self) -> None: @@ -753,22 +793,25 @@ def set_conventional_title_queued(self) -> None: def set_conventional_title_in_progress(self) -> None: return self.set_check_run_status(check_run=CONVENTIONAL_TITLE_STR, status=IN_PROGRESS_STR) - def set_conventional_title_success(self, output: Dict[str, Any]) -> None: + def set_conventional_title_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CONVENTIONAL_TITLE_STR, conclusion=SUCCESS_STR, output=output) - def set_conventional_title_failure(self, output: Dict[str, Any]) -> None: + def set_conventional_title_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CONVENTIONAL_TITLE_STR, conclusion=FAILURE_STR, output=output) def set_cherry_pick_in_progress(self) -> None: return self.set_check_run_status(check_run=CHERRY_PICKED_LABEL_PREFIX, status=IN_PROGRESS_STR) - def set_cherry_pick_success(self, output: Dict[str, Any]) -> None: + def set_cherry_pick_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CHERRY_PICKED_LABEL_PREFIX, conclusion=SUCCESS_STR, output=output) - def set_cherry_pick_failure(self, output: Dict[str, Any]) -> None: + def set_cherry_pick_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CHERRY_PICKED_LABEL_PREFIX, conclusion=FAILURE_STR, output=output) def create_issue_for_new_pull_request(self) -> None: + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + if self.parent_committer in self.auto_verified_and_merged_users: self.logger.info( f"{self.log_prefix} Committer {self.parent_committer} is part of " @@ -784,6 +827,9 @@ def create_issue_for_new_pull_request(self) -> None: ) def close_issue_for_merged_or_closed_pr(self, hook_action: str) -> None: + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + for issue in self.repository.get_issues(): if issue.body == self._generate_issue_body(): self.logger.info(f"{self.log_prefix} Closing issue {issue.title} for PR: {self.pull_request.title}") @@ -848,7 +894,7 @@ def process_comment_webhook_data(self) -> None: ) return - _user_commands: List[str] = [_cmd.strip("/") for _cmd in body.strip().splitlines() if _cmd.startswith("/")] + _user_commands: list[str] = [_cmd.strip("/") for _cmd in body.strip().splitlines() if _cmd.startswith("/")] user_login: str = self.hook_data["sender"]["login"] for user_command in _user_commands: @@ -862,7 +908,7 @@ def process_pull_request_webhook_data(self) -> None: hook_action: str = self.hook_data["action"] self.logger.info(f"{self.log_prefix} hook_action is: {hook_action}") - pull_request_data: Dict[str, Any] = self.hook_data["pull_request"] + pull_request_data: dict[str, Any] = self.hook_data["pull_request"] self.parent_committer = pull_request_data["user"]["login"] self.pull_request_branch = pull_request_data["base"]["ref"] if self.conventional_title: @@ -874,7 +920,7 @@ def process_pull_request_webhook_data(self) -> None: if hook_action == "opened": self.logger.info(f"{self.log_prefix} Creating welcome comment") - pull_request_opened_futures: List[Future] = [] + pull_request_opened_futures: list[Future] = [] with ThreadPoolExecutor() as executor: pull_request_opened_futures.append( executor.submit(self.pull_request.create_issue_comment, **{"body": self.welcome_msg}) @@ -892,7 +938,7 @@ def process_pull_request_webhook_data(self) -> None: self.logger.error(f"{self.log_prefix} {_exp}") if hook_action == "synchronize": - pull_request_synchronize_futures: List[Future] = [] + pull_request_synchronize_futures: list[Future] = [] with ThreadPoolExecutor() as executor: pull_request_synchronize_futures.append(executor.submit(self.remove_labels_when_pull_request_sync)) pull_request_synchronize_futures.append( @@ -933,7 +979,7 @@ def process_pull_request_webhook_data(self) -> None: if hook_action in ("labeled", "unlabeled"): _check_for_merge: bool = False - _reviewer: Optional[str] = None + _reviewer: str | None = None action_labeled = hook_action == "labeled" labeled = self.hook_data["label"]["name"].lower() if labeled == CAN_BE_MERGED_STR: @@ -946,7 +992,7 @@ def process_pull_request_webhook_data(self) -> None: if labeled.startswith(CHANGED_REQUESTED_BY_LABEL_PREFIX): _reviewer = labeled.split(CHANGED_REQUESTED_BY_LABEL_PREFIX)[-1] - _approved_output: Dict[str, Any] = {"title": "Approved", "summary": "", "text": ""} + _approved_output: dict[str, Any] = {"title": "Approved", "summary": "", "text": ""} if _reviewer in self.all_approvers: _check_for_merge = True _approved_output["text"] += f"Approved by {_reviewer}.\n" @@ -1061,7 +1107,7 @@ def _run_tox(self) -> None: with self._prepare_cloned_repo_dir(clone_repo_dir=clone_repo_dir): rc, out, err = run_command(command=cmd, log_prefix=self.log_prefix) - output: Dict[str, Any] = { + output: dict[str, Any] = { "title": "Tox", "summary": "", "text": self.get_check_run_text(err=err, out=out), @@ -1085,7 +1131,7 @@ def _run_pre_commit(self) -> None: with self._prepare_cloned_repo_dir(clone_repo_dir=clone_repo_dir): rc, out, err = run_command(command=cmd, log_prefix=self.log_prefix) - output: Dict[str, Any] = { + output: dict[str, Any] = { "title": "Pre-Commit", "summary": "", "text": self.get_check_run_text(err=err, out=out), @@ -1098,7 +1144,7 @@ def _run_pre_commit(self) -> None: def user_commands(self, command: str, reviewed_user: str, issue_comment_id: int) -> None: self.create_comment_reaction(issue_comment_id=issue_comment_id, reaction=REACTIONS.ok) - available_commands: List[str] = [ + available_commands: list[str] = [ COMMAND_RETEST_STR, COMMAND_CHERRY_PICK_STR, COMMAND_ASSIGN_REVIEWERS_STR, @@ -1107,7 +1153,7 @@ def user_commands(self, command: str, reviewed_user: str, issue_comment_id: int) COMMAND_ASSIGN_REVIEWER_STR, ] - command_and_args: List[str] = command.split(" ", 1) + command_and_args: list[str] = command.split(" ", 1) _command = command_and_args[0] _args: str = command_and_args[1] if len(command_and_args) > 1 else "" @@ -1140,12 +1186,10 @@ def user_commands(self, command: str, reviewed_user: str, issue_comment_id: int) self.check_if_can_be_merged() elif _command == COMMAND_CHERRY_PICK_STR: - self.process_cherry_pick_command( - issue_comment_id=issue_comment_id, command_args=_args, reviewed_user=reviewed_user - ) + self.process_cherry_pick_command(command_args=_args, reviewed_user=reviewed_user) elif _command == COMMAND_RETEST_STR: - self.process_retest_command(issue_comment_id=issue_comment_id, command_args=_args) + self.process_retest_command(command_args=_args) elif _command == BUILD_AND_PUSH_CONTAINER_STR: if self.build_and_push_container: @@ -1191,7 +1235,6 @@ def user_commands(self, command: str, reviewed_user: str, issue_comment_id: int) user_requested_label=_command, remove=remove, reviewed_user=reviewed_user, - issue_comment_id=issue_comment_id, ) def cherry_pick(self, target_branch: str, reviewed_user: str = "") -> None: @@ -1212,7 +1255,7 @@ def cherry_pick(self, target_branch: str, reviewed_user: str = "") -> None: clone_repo_dir = f"{self.clone_repo_dir}-{uuid4()}" git_cmd = f"git --work-tree={clone_repo_dir} --git-dir={clone_repo_dir}/.git" hub_cmd = f"GITHUB_TOKEN={self.token} hub --work-tree={clone_repo_dir} --git-dir={clone_repo_dir}/.git" - commands: List[str] = [ + commands: list[str] = [ f"{git_cmd} checkout {target_branch}", f"{git_cmd} pull origin {target_branch}", f"{git_cmd} checkout -b {new_branch_name} origin/{target_branch}", @@ -1264,6 +1307,9 @@ def label_all_opened_pull_requests_merge_state_after_merged(self) -> None: If the mergeable state is 'behind', the 'needs rebase' label is added. If the mergeable state is 'dirty', the 'has conflicts' label is added. """ + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + time_sleep = 30 self.logger.info(f"{self.log_prefix} Sleep for {time_sleep} seconds before getting all opened PRs") time.sleep(time_sleep) @@ -1438,7 +1484,7 @@ def _run_build_container( clone_repo_dir=clone_repo_dir, ): build_rc, build_out, build_err = self.run_podman_command(command=podman_build_cmd, pipe=True) - output: Dict[str, str] = { + output: dict[str, str] = { "title": "Build container", "summary": "", "text": self.get_check_run_text(err=build_err, out=build_out), @@ -1502,7 +1548,7 @@ def _run_install_python_module(self) -> None: log_prefix=self.log_prefix, ) - output: Dict[str, str] = { + output: dict[str, str] = { "title": "Python module installation", "summary": "", "text": self.get_check_run_text(err=err, out=out), @@ -1513,7 +1559,7 @@ def _run_install_python_module(self) -> None: return self.set_python_module_install_failure(output=output) def send_slack_message(self, message: str, webhook_url: str) -> None: - slack_data: Dict[str, str] = {"text": message} + slack_data: dict[str, str] = {"text": message} self.logger.info(f"{self.log_prefix} Sending message to slack: {message}") response: requests.Response = requests.post( webhook_url, @@ -1547,7 +1593,7 @@ def create_comment_reaction(self, issue_comment_id: int, reaction: str) -> None: _comment.create_reaction(reaction) def process_opened_or_synchronize_pull_request(self) -> None: - prepare_pull_futures: List[Future] = [] + prepare_pull_futures: list[Future] = [] with ThreadPoolExecutor() as executor: prepare_pull_futures.append(executor.submit(self.assign_reviewers)) prepare_pull_futures.append( @@ -1586,9 +1632,9 @@ def set_check_run_status( check_run: str, status: str = "", conclusion: str = "", - output: Optional[Dict[str, str]] = None, + output: dict[str, str] | None = None, ) -> None: - kwargs: Dict[str, Any] = {"name": check_run, "head_sha": self.last_commit.sha} + kwargs: dict[str, Any] = {"name": check_run, "head_sha": self.last_commit.sha} if status: kwargs["status"] = status @@ -1620,6 +1666,9 @@ def _prepare_cloned_repo_dir( checkout: str = "", tag_name: str = "", ) -> Generator[None, None, None]: + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + git_cmd = f"git --work-tree={clone_repo_dir} --git-dir={clone_repo_dir}/.git" # Clone the repository @@ -1696,7 +1745,10 @@ def get_story_key_with_jira_connection(self) -> str: return "" return _story_key - def get_branch_required_status_checks(self) -> List[str]: + def get_branch_required_status_checks(self) -> list[str]: + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + if self.repository.private: self.logger.info( f"{self.log_prefix} Repository is private, skipping getting branch protection required status checks" @@ -1707,11 +1759,11 @@ def get_branch_required_status_checks(self) -> List[str]: branch_protection = pull_request_branch.get_protection() return branch_protection.required_status_checks.contexts - def get_all_required_status_checks(self) -> List[str]: + def get_all_required_status_checks(self) -> list[str]: if not hasattr(self, "pull_request_branch"): self.pull_request_branch = self.pull_request.base.ref - all_required_status_checks: List[str] = [] + all_required_status_checks: list[str] = [] branch_required_status_checks = self.get_branch_required_status_checks() if self.tox: all_required_status_checks.append(TOX_STR) @@ -1769,9 +1821,12 @@ def set_jira_in_pull_request(self) -> None: f"Committer {self.parent_committer} is not in {reviewers_and_approvers}" ) - def process_cherry_pick_command(self, issue_comment_id: int, command_args: str, reviewed_user: str) -> None: - _target_branches: List[str] = command_args.split() - _exits_target_branches: Set[str] = set() + def process_cherry_pick_command(self, command_args: str, reviewed_user: str) -> None: + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + + _target_branches: list[str] = command_args.split() + _exits_target_branches: set[str] = set() _non_exits_target_branches_msg: str = "" for _target_branch in _target_branches: @@ -1788,7 +1843,7 @@ def process_cherry_pick_command(self, issue_comment_id: int, command_args: str, if _exits_target_branches: if not self.pull_request.is_merged(): - cp_labels: List[str] = [ + cp_labels: list[str] = [ f"{CHERRY_PICK_LABEL_PREFIX}{_target_branch}" for _target_branch in _exits_target_branches ] info_msg: str = f""" @@ -1806,11 +1861,11 @@ def process_cherry_pick_command(self, issue_comment_id: int, command_args: str, reviewed_user=reviewed_user, ) - def process_retest_command(self, issue_comment_id: int, command_args: str) -> None: - _target_tests: List[str] = command_args.split() - _not_supported_retests: List[str] = [] - _supported_retests: List[str] = [] - _retests_to_func_map: Dict[str, Callable] = { + def process_retest_command(self, command_args: str) -> None: + _target_tests: list[str] = command_args.split() + _not_supported_retests: list[str] = [] + _supported_retests: list[str] = [] + _retests_to_func_map: dict[str, Callable] = { TOX_STR: self._run_tox, PRE_COMMIT_STR: self._run_pre_commit, BUILD_CONTAINER_STR: self._run_build_container, @@ -1850,7 +1905,7 @@ def process_retest_command(self, issue_comment_id: int, command_args: str) -> No self.pull_request.create_issue_comment(msg) if _supported_retests: - _retest_to_exec: List[Future] = [] + _retest_to_exec: list[Future] = [] with ThreadPoolExecutor() as executor: for _test in _supported_retests: _retest_to_exec.append(executor.submit(_retests_to_func_map[_test])) @@ -1983,8 +2038,8 @@ def set_pull_request_automerge(self) -> None: self.logger.error(f"{self.log_prefix} Exception while setting auto merge: {exp}") @property - def _current_pull_request_supported_retest(self) -> List[str]: - current_pull_request_supported_retest: List[str] = [] + def _current_pull_request_supported_retest(self) -> list[str]: + current_pull_request_supported_retest: list[str] = [] if self.tox: current_pull_request_supported_retest.append(TOX_STR) @@ -2009,7 +2064,7 @@ def fix_podman_bug(self) -> None: shutil.rmtree("/tmp/storage-run-1000/containers", ignore_errors=True) shutil.rmtree("/tmp/storage-run-1000/libpod/tmp", ignore_errors=True) - def run_podman_command(self, command: str, pipe: bool = False) -> Tuple[bool, str, str]: + def run_podman_command(self, command: str, pipe: bool = False) -> tuple[bool, str, str]: rc, out, err = run_command(command=command, log_prefix=self.log_prefix, pipe=pipe) if rc: @@ -2022,6 +2077,9 @@ def run_podman_command(self, command: str, pipe: bool = False) -> Tuple[bool, st return rc, out, err def get_all_approvers_and_reviewers(self) -> dict[str, dict[str, Any]]: + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + # Dictionary mapping OWNERS file paths to their approvers and reviewers _owners: dict[str, dict[str, Any]] = {} @@ -2239,6 +2297,9 @@ def _check_if_pr_approved(self, labels: list[str]) -> str: return "" def _add_reviewer_by_user_comment(self, reviewer: str) -> None: + if not self.repository: + raise RepositoryNotInitializedError(f"{self.log_prefix} Repository is not initialized") + reviewer = reviewer.strip("@") self.logger.info(f"{self.log_prefix} Adding reviewer {reviewer} by user comment") @@ -2252,7 +2313,7 @@ def _add_reviewer_by_user_comment(self, reviewer: str) -> None: self.pull_request.create_issue_comment(_err) def conventional_title_check(self) -> None: - output: Dict[str, str] = { + output: dict[str, str] = { "title": "Conventional Title", "summary": "", "text": "", diff --git a/webhook_server_container/utils/github_repository_settings.py b/webhook_server_container/utils/github_repository_settings.py index 200071e2..b8612e77 100644 --- a/webhook_server_container/utils/github_repository_settings.py +++ b/webhook_server_container/utils/github_repository_settings.py @@ -2,7 +2,7 @@ import os from concurrent.futures import Future, ThreadPoolExecutor, as_completed from copy import deepcopy -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable import github from github import Auth, Github, GithubIntegration @@ -47,7 +47,7 @@ def get_branch_sampler(repo: Repository, branch_name: str) -> Branch: def set_branch_protection( branch: Branch, repository: Repository, - required_status_checks: List[str], + required_status_checks: list[str], github_api: Github, ) -> bool: logger = get_logger_with_params(name="github-repository-settings") @@ -101,10 +101,10 @@ def set_repository_settings(repository: Repository) -> None: def get_required_status_checks( repo: Repository, - data: Dict[str, Any], - default_status_checks: List[str], - exclude_status_checks: List[str], -) -> List[str]: + data: dict[str, Any], + default_status_checks: list[str], + exclude_status_checks: list[str], +) -> list[str]: if data.get("tox"): default_status_checks.append("tox") @@ -131,9 +131,9 @@ def get_required_status_checks( return default_status_checks -def get_user_configures_status_checks(status_checks: Dict[str, Any]) -> Tuple[List[str], List[str]]: - include_status_checks: List[str] = [] - exclude_status_checks: List[str] = [] +def get_user_configures_status_checks(status_checks: dict[str, Any]) -> tuple[list[str], list[str]]: + include_status_checks: list[str] = [] + exclude_status_checks: list[str] = [] if status_checks: include_status_checks = status_checks.get("include-runs", []) exclude_status_checks = status_checks.get("exclude-runs", []) @@ -145,7 +145,7 @@ def set_repository_labels(repository: Repository) -> str: logger = get_logger_with_params(name="github-repository-settings") logger.info(f"Set repository {repository.name} labels") - repository_labels: Dict[str, Dict[str, Any]] = {} + repository_labels: dict[str, dict[str, Any]] = {} for label in repository.get_labels(): repository_labels[label.name.lower()] = { "object": label, @@ -173,10 +173,10 @@ def set_repositories_settings(config_: Config, github_api: Github) -> None: logger.info("Processing repositories") config_data = config_.data - default_status_checks: List[str] = config_data.get("default-status-checks", []) + [ + default_status_checks: list[str] = config_data.get("default-status-checks", []) + [ CAN_BE_MERGED_STR, ] - docker: Optional[Dict[str, str]] = config_data.get("docker") + docker: dict[str, str] = config_data.get("docker", {}) if docker: logger.info("Login in to docker.io") docker_username: str = docker["username"] @@ -201,13 +201,13 @@ def set_repositories_settings(config_: Config, github_api: Github) -> None: def set_repository( - data: Dict[str, Any], github_api: Github, default_status_checks: List[str] -) -> Tuple[bool, str, Callable]: + data: dict[str, Any], github_api: Github, default_status_checks: list[str] +) -> tuple[bool, str, Callable]: logger = get_logger_with_params(name="github-repository-settings") repository: str = data["name"] logger.info(f"Processing repository {repository}") - protected_branches: Dict[str, Any] = data.get("protected-branches", {}) + protected_branches: dict[str, Any] = data.get("protected-branches", {}) repo = _get_github_repo_api(github_api=github_api, repository=repository) if not repo: return False, f"{repository}: Failed to get repository", logger.error @@ -219,7 +219,7 @@ def set_repository( if repo.private: return False, f"{repository}: Repository is private, skipping setting branch settings", logger.warning - futures: List["Future"] = [] + futures: list["Future"] = [] with ThreadPoolExecutor() as executor: for branch_name, status_checks in protected_branches.items(): @@ -272,7 +272,7 @@ def set_all_in_progress_check_runs_to_queued(config_: Config, github_api: Github BUILD_CONTAINER_STR, PRE_COMMIT_STR, ) - futures: List["Future"] = [] + futures: list["Future"] = [] with ThreadPoolExecutor() as executor: for _, data in config_.data["repositories"].items(): @@ -292,8 +292,8 @@ def set_all_in_progress_check_runs_to_queued(config_: Config, github_api: Github def set_repository_check_runs_to_queued( - config_: Config, data: Dict[str, Any], github_api: Github, check_runs: Tuple[str] -) -> Tuple[bool, str, Callable]: + config_: Config, data: dict[str, Any], github_api: Github, check_runs: tuple[str] +) -> tuple[bool, str, Callable]: logger = get_logger_with_params(name="github-repository-settings") repository: str = data["name"] @@ -325,7 +325,7 @@ def set_repository_check_runs_to_queued( return True, f"{repository}: Set check run status to {QUEUED_STR} is done", logger.debug -def get_repository_github_app_api(config_: Config, repository_name: str) -> Optional[Github]: +def get_repository_github_app_api(config_: Config, repository_name: str) -> Github | None: logger = get_logger_with_params(name="github-repository-settings") logger.debug("Getting repositories GitHub app API") diff --git a/webhook_server_container/utils/helpers.py b/webhook_server_container/utils/helpers.py index 3a774f70..c3abbaf0 100644 --- a/webhook_server_container/utils/helpers.py +++ b/webhook_server_container/utils/helpers.py @@ -5,35 +5,46 @@ import subprocess from concurrent.futures import Future, as_completed from logging import Logger -from typing import Any, Dict, List, Optional, Tuple +from typing import Any import github from colorama import Fore from github.RateLimit import RateLimit from github.Repository import Repository from simple_logger.logger import get_logger +from timeout_sampler import LOGGER from webhook_server_container.libs.config import Config def get_value_from_dicts( - primary_dict: Dict[Any, Any], - secondary_dict: Dict[Any, Any], + primary_dict: dict[Any, Any], + secondary_dict: dict[Any, Any], key: str, - return_on_none: Optional[Any] = None, + third_dict: dict[Any, Any] | None = None, + return_on_none: Any = None, ) -> Any: """ Get value from two dictionaries. - If value is not found in primary_dict, try to get it from secondary_dict, otherwise return return_on_none. + If value is not found in primary_dict, try to get it from secondary_dict or the third_dict, otherwise return return_on_none. """ - return primary_dict.get(key, secondary_dict.get(key, return_on_none)) + third_dict = third_dict if third_dict else {} + value = primary_dict.get(key) + if not value: + value = secondary_dict.get(key) + if not value: + value = third_dict.get(key, return_on_none) -def get_logger_with_params(name: str, repository_name: Optional[str] = "") -> Logger: + LOGGER.debug(f"Got value: {value} for key: {key}") + return value + + +def get_logger_with_params(name: str, repository_name: str = "") -> Logger: _config = Config() config_data = _config.data # Global repositories configuration - repo_data: Dict[str, Any] = {} + repo_data: dict[str, Any] = {} if repository_name: repo_data = _config.repository_data(repository_name=repository_name) # Specific repository configuration @@ -45,7 +56,7 @@ def get_logger_with_params(name: str, repository_name: Optional[str] = "") -> Lo return get_logger(name=name, filename=log_file, level=log_level, file_max_bytes=1048576 * 50) # 50MB -def extract_key_from_dict(key: Any, _dict: Dict[Any, Any]) -> Any: +def extract_key_from_dict(key: Any, _dict: dict[Any, Any]) -> Any: if isinstance(_dict, dict): for _key, _val in _dict.items(): if _key == key: @@ -68,12 +79,12 @@ def run_command( log_prefix: str, verify_stderr: bool = False, shell: bool = False, - timeout: Optional[int] = None, + timeout: int | None = None, capture_output: bool = True, check: bool = False, pipe: bool = False, **kwargs: Any, -) -> Tuple[bool, Any, Any]: +) -> tuple[bool, Any, Any]: """ Run command locally. @@ -140,8 +151,8 @@ def run_command( return False, out_decoded, err_decoded -def get_apis_and_tokes_from_config(config: Config, repository_name: str = "") -> List[Tuple[github.Github, str]]: - apis_and_tokens: List[Tuple[github.Github, str]] = [] +def get_apis_and_tokes_from_config(config: Config, repository_name: str = "") -> list[tuple[github.Github, str]]: + apis_and_tokens: list[tuple[github.Github, str]] = [] tokens = get_value_from_dicts( primary_dict=config.repository_data(repository_name=repository_name), @@ -158,7 +169,7 @@ def get_apis_and_tokes_from_config(config: Config, repository_name: str = "") -> def get_api_with_highest_rate_limit( config: Config, repository_name: str = "" -) -> Tuple[github.Github | None, str | None]: +) -> tuple[github.Github | None, str | None]: """ Get API with the highest rate limit @@ -171,10 +182,10 @@ def get_api_with_highest_rate_limit( """ logger = get_logger_with_params(name="helpers") - api: Optional[github.Github] = None - token: Optional[str] = None + api: github.Github | None = None + token: str | None = None _api_user: str = "" - rate_limit: Optional[RateLimit] = None + rate_limit: RateLimit | None = None remaining = 0 @@ -220,9 +231,9 @@ def log_rate_limit(rate_limit: RateLimit, api_user: str) -> None: logger.warning(msg) -def get_future_results(futures: List["Future"]) -> None: +def get_future_results(futures: list["Future"]) -> None: """ - result must return Tuple[bool, str, Callable] when the Callable is Logger function (LOGGER.info, LOGGER.error, etc) + result must return tuple[bool, str, Callable] when the Callable is Logger function (LOGGER.info, LOGGER.error, etc) """ for result in as_completed(futures): _res = result.result()