From 6a413dd338924a9b37a6e23801f23fdccbd6fe41 Mon Sep 17 00:00:00 2001 From: Meni Yakove Date: Sat, 8 Mar 2025 02:06:09 +0200 Subject: [PATCH 1/4] Refactor Config() --- webhook_server_container/libs/config.py | 23 ++- webhook_server_container/libs/github_api.py | 193 ++++++++---------- webhook_server_container/tests/conftest.py | 5 +- .../tests/test_branch_protection.py | 17 +- .../utils/github_repository_settings.py | 50 ++--- webhook_server_container/utils/helpers.py | 25 +-- webhook_server_container/utils/webhook.py | 13 +- 7 files changed, 153 insertions(+), 173 deletions(-) diff --git a/webhook_server_container/libs/config.py b/webhook_server_container/libs/config.py index 1bbb489e..df91988d 100644 --- a/webhook_server_container/libs/config.py +++ b/webhook_server_container/libs/config.py @@ -1,23 +1,36 @@ import os -from typing import Any, Dict +from typing import Any import yaml class Config: - def __init__(self) -> None: + def __init__(self, repository: str | None = None) -> None: self.data_dir: str = os.environ.get("WEBHOOK_SERVER_DATA_DIR", "/home/podman/data") self.config_path: str = os.path.join(self.data_dir, "config.yaml") self.exists() + self.repository = repository def exists(self) -> None: if not os.path.isfile(self.config_path): raise FileNotFoundError(f"Config file {self.config_path} not found") @property - def data(self) -> Dict[str, Any]: + def data(self) -> dict[str, Any]: with open(self.config_path) as fd: return yaml.safe_load(fd) - def repository_data(self, repository_name: str) -> Dict[str, Any]: - return self.data["repositories"].get(repository_name, {}) + @property + def repository_data(self) -> dict[str, Any]: + return self.data["repositories"].get(self.repository, {}) + + def get_value(self, value: str, return_on_none: Any = None) -> Any: + """ + Get value from config, try first from repository and if not exists get it from root config + """ + _val = self.repository_data.get(value) + + if _val is None: + _val = self.data.get(value) + + return _val or return_on_none diff --git a/webhook_server_container/libs/github_api.py b/webhook_server_container/libs/github_api.py index c88a8b3c..8fc7a559 100644 --- a/webhook_server_container/libs/github_api.py +++ b/webhook_server_container/libs/github_api.py @@ -10,7 +10,7 @@ import time from concurrent.futures import Future, ThreadPoolExecutor, as_completed from pathlib import Path -from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple +from typing import Any, Callable, Generator, Optional, Set, Tuple from uuid import uuid4 import requests @@ -76,7 +76,6 @@ get_api_with_highest_rate_limit, get_apis_and_tokes_from_config, get_github_repo_api, - get_value_from_dicts, run_command, ) @@ -90,7 +89,7 @@ class RepositoryNotFoundError(Exception): class ProcessGithubWehookError(Exception): - def __init__(self, err: Dict[str, str]): + def __init__(self, err: dict[str, str]): self.err = err def __str__(self) -> str: @@ -98,21 +97,22 @@ def __str__(self) -> str: class ProcessGithubWehook: - def __init__(self, hook_data: Dict[Any, Any], headers: Headers, logger: logging.Logger) -> None: + def __init__(self, hook_data: dict[Any, Any], headers: Headers, logger: logging.Logger) -> None: self.logger = logger self.logger.name = "ProcessGithubWehook" self.hook_data = hook_data self.headers = headers self.repository_name: str = hook_data["repository"]["name"] + self.repository_full_name: str = hook_data["repository"]["full_name"] self.parent_committer: str = "" self.jira_track_pr: bool = False self.issue_title: str = "" - self.all_required_status_checks: List[str] = [] + self.all_required_status_checks: list[str] = [] self.x_github_delivery: str = self.headers.get("X-GitHub-Delivery", "") self.github_event: str = self.headers["X-GitHub-Event"] - self.owners_content: Dict[str, Any] = {} + self.owners_content: dict[str, Any] = {} - self.config = Config() + self.config = Config(repository=self.repository_name) self._repo_data_from_config() self.github_api, self.token, self.api_user = get_api_with_highest_rate_limit( config=self.config, repository_name=self.repository_name @@ -255,11 +255,11 @@ def prepare_retest_wellcome_msg(self) -> str: return " * This repository does not support retest actions" if not retest_msg else retest_msg def add_api_users_to_auto_verified_and_merged_users(self) -> None: - apis_and_tokens = get_apis_and_tokes_from_config(config=self.config, repository_name=self.repository_name) + apis_and_tokens = get_apis_and_tokes_from_config(config=self.config) self.auto_verified_and_merged_users.extend([_api[0].get_user().login for _api in apis_and_tokens]) def _get_reposiroty_color_for_log_prefix(self) -> str: - def _get_random_color(_colors: List[str], _json: Dict[str, str]) -> str: + def _get_random_color(_colors: list[str], _json: dict[str, str]) -> str: color = random.choice(_colors) _json[self.repository_name] = color @@ -268,8 +268,8 @@ def _get_random_color(_colors: List[str], _json: Dict[str, str]) -> str: return self.repository_name - _all_colors: List[str] = [] - color_json: Dict[str, str] + _all_colors: list[str] = [] + color_json: dict[str, str] _colors_to_exclude = ("blue", "white", "black", "grey") color_file: str = os.path.join(self.config.data_dir, "log-colors.json") @@ -316,7 +316,7 @@ def prepare_log_prefix(self, pull_request: PullRequest | None = None) -> str: ) def process_pull_request_check_run_webhook_data(self) -> None: - _check_run: Dict[str, Any] = self.hook_data["check_run"] + _check_run: dict[str, Any] = self.hook_data["check_run"] check_run_name: str = _check_run["name"] if self.hook_data.get("action", "") != "completed": @@ -354,36 +354,18 @@ def process_pull_request_check_run_webhook_data(self) -> None: self.logger.error(f"{self.log_prefix} No pull request found") def _repo_data_from_config(self) -> None: - config_data = self.config.data # Global repositories configuration - repo_data = self.config.repository_data( - repository_name=self.repository_name - ) # Specific repository configuration - - if not repo_data: + if not self.config.repository: raise RepositoryNotFoundError(f"Repository {self.repository_name} not found in config file") - self.repository_full_name: str = repo_data["name"] - self.github_app_id: str = get_value_from_dicts( - primary_dict=repo_data, secondary_dict=config_data, key="github-app-id" - ) - self.pypi: Dict[str, str] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="pypi") - self.verified_job: bool = get_value_from_dicts( - primary_dict=repo_data, - secondary_dict=config_data, - key="verified-job", - return_on_none=True, - ) - self.tox: Dict[str, str] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="tox") - self.tox_python_version: str = get_value_from_dicts( - primary_dict=repo_data, - secondary_dict=config_data, - key="tox-python-version", - return_on_none=None, - ) - self.slack_webhook_url: str = get_value_from_dicts( - primary_dict=repo_data, secondary_dict=config_data, key="slack_webhook_url" - ) - self.build_and_push_container: Dict[str, Any] = repo_data.get("container", {}) + self.github_app_id: str = self.config.get_value(value="github-app-id") + + self.pypi: dict[str, str] = self.config.get_value(value="pypi") + self.verified_job: bool = self.config.get_value(value="verified-job", return_on_none=True) + self.tox: dict[str, str] = self.config.get_value(value="tox") + self.tox_python_version: str = self.config.get_value(value="tox-python-version") + self.slack_webhook_url: str = self.config.get_value(value="slack_webhook_url") + + self.build_and_push_container: dict[str, Any] = self.config.get_value(value="container", return_on_none={}) if self.build_and_push_container: self.container_repository_username: str = self.build_and_push_container["username"] self.container_repository_password: str = self.build_and_push_container["password"] @@ -394,24 +376,17 @@ def _repo_data_from_config(self) -> None: self.container_command_args: str = self.build_and_push_container.get("args", "") self.container_release: bool = self.build_and_push_container.get("release", False) - self.pre_commit: bool = get_value_from_dicts( - primary_dict=repo_data, - secondary_dict=config_data, - key="pre-commit", - return_on_none=False, - ) + self.pre_commit: bool = self.config.get_value(value="pre-commit", return_on_none=False) self.jira_enabled_repository: bool = False - self.jira_tracking: bool = get_value_from_dicts( - primary_dict=repo_data, secondary_dict=config_data, key="jira-tracking" - ) - self.jira: Dict[str, Any] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="jira") + self.jira_tracking: bool = self.config.get_value(value="jira-tracking") + self.jira: dict[str, Any] = self.config.get_value(value="jira") if self.jira_tracking and self.jira: self.jira_server: str = self.jira["server"] self.jira_project: str = self.jira["project"] self.jira_token: str = self.jira["token"] self.jira_epic: Optional[str] = self.jira.get("epic", "") - self.jira_user_mapping: Dict[str, str] = self.jira.get("user-mapping", {}) + self.jira_user_mapping: dict[str, str] = self.jira.get("user-mapping", {}) self.jira_enabled_repository = all([self.jira_server, self.jira_project, self.jira_token]) if not self.jira_enabled_repository: self.logger.error( @@ -419,21 +394,13 @@ def _repo_data_from_config(self) -> None: f"Project: {self.jira_project}, Token: {self.jira_token}" ) - self.auto_verified_and_merged_users: List[str] = get_value_from_dicts( - primary_dict=repo_data, - secondary_dict=config_data, - key="auto-verified-and-merged-users", - return_on_none=[], - ) - self.can_be_merged_required_labels = get_value_from_dicts( - primary_dict=repo_data, - secondary_dict=config_data, - key="can-be-merged-required-labels", - return_on_none=[], + self.auto_verified_and_merged_users: list[str] = self.config.get_value( + value="auto-verified-and-merged-users", return_on_none=[] ) - self.conventional_title: str = get_value_from_dicts( - primary_dict=repo_data, secondary_dict=config_data, key="conventional-title" + self.can_be_merged_required_labels = self.config.get_value( + value="can-be-merged-required-labels", return_on_none=[] ) + self.conventional_title: str = self.config.get_value(value="conventional-title") def _get_pull_request(self, number: Optional[int] = None) -> PullRequest: if number: @@ -445,7 +412,7 @@ def _get_pull_request(self, number: Optional[int] = None) -> PullRequest: except GithubException: continue - commit: Dict[str, Any] = self.hook_data.get("commit", {}) + commit: dict[str, Any] = self.hook_data.get("commit", {}) if commit: commit_obj = self.repository.get_commit(commit["sha"]) with contextlib.suppress(Exception): @@ -459,7 +426,7 @@ def _get_last_commit(self) -> Commit: def label_exists_in_pull_request(self, label: str) -> bool: return any(lb for lb in self.pull_request_labels_names() if lb == label) - def pull_request_labels_names(self) -> List[str]: + def pull_request_labels_names(self) -> list[str]: return [lb.name for lb in self.pull_request.labels] if self.pull_request else [] def skip_if_pull_request_already_merged(self) -> bool: @@ -574,7 +541,7 @@ def _issue_on_error(_error: str) -> None: tar_gz_file = tar_gz_file.strip() - commands: List[str] = [ + commands: list[str] = [ f"uvx {uv_cmd_dir} twine check {_dist_dir}/{tar_gz_file}", f"uvx {uv_cmd_dir} twine upload --username __token__ --password {self.pypi['token']} {_dist_dir}/{tar_gz_file} --skip-existing", ] @@ -594,13 +561,13 @@ def _issue_on_error(_error: str) -> None: self.send_slack_message(message=message, webhook_url=self.slack_webhook_url) @property - def root_reviewers(self) -> List[str]: + def root_reviewers(self) -> list[str]: _reviewers = self.all_approvers_and_reviewers.get(".", {}).get("reviewers", []) self.logger.debug(f"{self.log_prefix} ROOT Reviewers: {_reviewers}") return _reviewers @property - def root_approvers(self) -> List[str]: + def root_approvers(self) -> list[str]: _approvers = self.all_approvers_and_reviewers.get(".", {}).get("approvers", []) self.logger.debug(f"{self.log_prefix} ROOT Approvers: {_approvers}") return _approvers @@ -611,7 +578,7 @@ def list_changed_files(self) -> list[str]: def assign_reviewers(self) -> None: self.logger.info(f"{self.log_prefix} Assign reviewers") - _to_add: List[str] = list(set(self.all_reviewers)) + _to_add: list[str] = list(set(self.all_reviewers)) self.logger.debug(f"{self.log_prefix} Reviewers to add: {', '.join(_to_add)}") for reviewer in _to_add: @@ -690,10 +657,10 @@ def set_run_tox_check_queued(self) -> None: def set_run_tox_check_in_progress(self) -> None: return self.set_check_run_status(check_run=TOX_STR, status=IN_PROGRESS_STR) - def set_run_tox_check_failure(self, output: Dict[str, Any]) -> None: + def set_run_tox_check_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=TOX_STR, conclusion=FAILURE_STR, output=output) - def set_run_tox_check_success(self, output: Dict[str, Any]) -> None: + def set_run_tox_check_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=TOX_STR, conclusion=SUCCESS_STR, output=output) def set_run_pre_commit_check_queued(self) -> None: @@ -705,13 +672,13 @@ def set_run_pre_commit_check_queued(self) -> None: def set_run_pre_commit_check_in_progress(self) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, status=IN_PROGRESS_STR) - def set_run_pre_commit_check_failure(self, output: Optional[Dict[str, Any]] = None) -> None: + def set_run_pre_commit_check_failure(self, output: Optional[dict[str, Any]] = None) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, conclusion=FAILURE_STR, output=output) - def set_run_pre_commit_check_success(self, output: Optional[Dict[str, Any]] = None) -> None: + def set_run_pre_commit_check_success(self, output: Optional[dict[str, Any]] = None) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, conclusion=SUCCESS_STR, output=output) - def set_merge_check_queued(self, output: Optional[Dict[str, Any]] = None) -> None: + def set_merge_check_queued(self, output: Optional[dict[str, Any]] = None) -> None: return self.set_check_run_status(check_run=CAN_BE_MERGED_STR, status=QUEUED_STR, output=output) def set_merge_check_in_progress(self) -> None: @@ -720,7 +687,7 @@ def set_merge_check_in_progress(self) -> None: def set_merge_check_success(self) -> None: return self.set_check_run_status(check_run=CAN_BE_MERGED_STR, conclusion=SUCCESS_STR) - def set_merge_check_failure(self, output: Dict[str, Any]) -> None: + def set_merge_check_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CAN_BE_MERGED_STR, conclusion=FAILURE_STR, output=output) def set_container_build_queued(self) -> None: @@ -732,10 +699,10 @@ def set_container_build_queued(self) -> None: def set_container_build_in_progress(self) -> None: return self.set_check_run_status(check_run=BUILD_CONTAINER_STR, status=IN_PROGRESS_STR) - def set_container_build_success(self, output: Dict[str, Any]) -> None: + def set_container_build_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=BUILD_CONTAINER_STR, conclusion=SUCCESS_STR, output=output) - def set_container_build_failure(self, output: Dict[str, Any]) -> None: + def set_container_build_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=BUILD_CONTAINER_STR, conclusion=FAILURE_STR, output=output) def set_python_module_install_queued(self) -> None: @@ -747,10 +714,10 @@ def set_python_module_install_queued(self) -> None: def set_python_module_install_in_progress(self) -> None: return self.set_check_run_status(check_run=PYTHON_MODULE_INSTALL_STR, status=IN_PROGRESS_STR) - def set_python_module_install_success(self, output: Dict[str, Any]) -> None: + def set_python_module_install_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=PYTHON_MODULE_INSTALL_STR, conclusion=SUCCESS_STR, output=output) - def set_python_module_install_failure(self, output: Dict[str, Any]) -> None: + def set_python_module_install_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=PYTHON_MODULE_INSTALL_STR, conclusion=FAILURE_STR, output=output) def set_conventional_title_queued(self) -> None: @@ -759,19 +726,19 @@ def set_conventional_title_queued(self) -> None: def set_conventional_title_in_progress(self) -> None: return self.set_check_run_status(check_run=CONVENTIONAL_TITLE_STR, status=IN_PROGRESS_STR) - def set_conventional_title_success(self, output: Dict[str, Any]) -> None: + def set_conventional_title_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CONVENTIONAL_TITLE_STR, conclusion=SUCCESS_STR, output=output) - def set_conventional_title_failure(self, output: Dict[str, Any]) -> None: + def set_conventional_title_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CONVENTIONAL_TITLE_STR, conclusion=FAILURE_STR, output=output) def set_cherry_pick_in_progress(self) -> None: return self.set_check_run_status(check_run=CHERRY_PICKED_LABEL_PREFIX, status=IN_PROGRESS_STR) - def set_cherry_pick_success(self, output: Dict[str, Any]) -> None: + def set_cherry_pick_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CHERRY_PICKED_LABEL_PREFIX, conclusion=SUCCESS_STR, output=output) - def set_cherry_pick_failure(self, output: Dict[str, Any]) -> None: + def set_cherry_pick_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CHERRY_PICKED_LABEL_PREFIX, conclusion=FAILURE_STR, output=output) def create_issue_for_new_pull_request(self) -> None: @@ -854,7 +821,7 @@ def process_comment_webhook_data(self) -> None: ) return - _user_commands: List[str] = [_cmd.strip("/") for _cmd in body.strip().splitlines() if _cmd.startswith("/")] + _user_commands: list[str] = [_cmd.strip("/") for _cmd in body.strip().splitlines() if _cmd.startswith("/")] user_login: str = self.hook_data["sender"]["login"] for user_command in _user_commands: @@ -868,7 +835,7 @@ def process_pull_request_webhook_data(self) -> None: hook_action: str = self.hook_data["action"] self.logger.info(f"{self.log_prefix} hook_action is: {hook_action}") - pull_request_data: Dict[str, Any] = self.hook_data["pull_request"] + pull_request_data: dict[str, Any] = self.hook_data["pull_request"] self.parent_committer = pull_request_data["user"]["login"] self.pull_request_branch = pull_request_data["base"]["ref"] if self.conventional_title: @@ -880,7 +847,7 @@ def process_pull_request_webhook_data(self) -> None: if hook_action == "opened": self.logger.info(f"{self.log_prefix} Creating welcome comment") - pull_request_opened_futures: List[Future] = [] + pull_request_opened_futures: list[Future] = [] with ThreadPoolExecutor() as executor: pull_request_opened_futures.append( executor.submit(self.pull_request.create_issue_comment, **{"body": self.welcome_msg}) @@ -898,7 +865,7 @@ def process_pull_request_webhook_data(self) -> None: self.logger.error(f"{self.log_prefix} {_exp}") if hook_action == "synchronize": - pull_request_synchronize_futures: List[Future] = [] + pull_request_synchronize_futures: list[Future] = [] with ThreadPoolExecutor() as executor: pull_request_synchronize_futures.append(executor.submit(self.remove_labels_when_pull_request_sync)) pull_request_synchronize_futures.append( @@ -952,7 +919,7 @@ def process_pull_request_webhook_data(self) -> None: if labeled.startswith(CHANGED_REQUESTED_BY_LABEL_PREFIX): _reviewer = labeled.split(CHANGED_REQUESTED_BY_LABEL_PREFIX)[-1] - _approved_output: Dict[str, Any] = {"title": "Approved", "summary": "", "text": ""} + _approved_output: dict[str, Any] = {"title": "Approved", "summary": "", "text": ""} if _reviewer in self.all_approvers: _check_for_merge = True _approved_output["text"] += f"Approved by {_reviewer}.\n" @@ -1065,7 +1032,7 @@ def _run_tox(self) -> None: self.set_run_tox_check_in_progress() with self._prepare_cloned_repo_dir(clone_repo_dir=clone_repo_dir) as _res: - output: Dict[str, Any] = { + output: dict[str, Any] = { "title": "Tox", "summary": "", "text": None, @@ -1095,7 +1062,7 @@ def _run_pre_commit(self) -> None: cmd = f" uvx --directory {clone_repo_dir} {PRE_COMMIT_STR} run --all-files" self.set_run_pre_commit_check_in_progress() with self._prepare_cloned_repo_dir(clone_repo_dir=clone_repo_dir) as _res: - output: Dict[str, Any] = { + output: dict[str, Any] = { "title": "Pre-Commit", "summary": "", "text": None, @@ -1116,7 +1083,7 @@ def _run_pre_commit(self) -> None: def user_commands(self, command: str, reviewed_user: str, issue_comment_id: int) -> None: self.create_comment_reaction(issue_comment_id=issue_comment_id, reaction=REACTIONS.ok) - available_commands: List[str] = [ + available_commands: list[str] = [ COMMAND_RETEST_STR, COMMAND_CHERRY_PICK_STR, COMMAND_ASSIGN_REVIEWERS_STR, @@ -1125,7 +1092,7 @@ def user_commands(self, command: str, reviewed_user: str, issue_comment_id: int) COMMAND_ASSIGN_REVIEWER_STR, ] - command_and_args: List[str] = command.split(" ", 1) + command_and_args: list[str] = command.split(" ", 1) _command = command_and_args[0] _args: str = command_and_args[1] if len(command_and_args) > 1 else "" @@ -1230,7 +1197,7 @@ def cherry_pick(self, target_branch: str, reviewed_user: str = "") -> None: clone_repo_dir = f"{self.clone_repo_dir}-{uuid4()}" git_cmd = f"git --work-tree={clone_repo_dir} --git-dir={clone_repo_dir}/.git" hub_cmd = f"GITHUB_TOKEN={self.token} hub --work-tree={clone_repo_dir} --git-dir={clone_repo_dir}/.git" - commands: List[str] = [ + commands: list[str] = [ f"{git_cmd} checkout {target_branch}", f"{git_cmd} pull origin {target_branch}", f"{git_cmd} checkout -b {new_branch_name} origin/{target_branch}", @@ -1457,7 +1424,7 @@ def _run_build_container( tag_name=tag, clone_repo_dir=clone_repo_dir, ) as _res: - output: Dict[str, Any] = { + output: dict[str, Any] = { "title": "Build container", "summary": "", "text": None, @@ -1523,7 +1490,7 @@ def _run_install_python_module(self) -> None: with self._prepare_cloned_repo_dir( clone_repo_dir=clone_repo_dir, ) as _res: - output: Dict[str, Any] = { + output: dict[str, Any] = { "title": "Python module installation", "summary": "", "text": None, @@ -1545,7 +1512,7 @@ def _run_install_python_module(self) -> None: return self.set_python_module_install_failure(output=output) def send_slack_message(self, message: str, webhook_url: str) -> None: - slack_data: Dict[str, str] = {"text": message} + slack_data: dict[str, str] = {"text": message} self.logger.info(f"{self.log_prefix} Sending message to slack: {message}") response: requests.Response = requests.post( webhook_url, @@ -1579,7 +1546,7 @@ def create_comment_reaction(self, issue_comment_id: int, reaction: str) -> None: _comment.create_reaction(reaction) def process_opened_or_synchronize_pull_request(self) -> None: - prepare_pull_futures: List[Future] = [] + prepare_pull_futures: list[Future] = [] with ThreadPoolExecutor() as executor: prepare_pull_futures.append(executor.submit(self.assign_reviewers)) prepare_pull_futures.append( @@ -1618,9 +1585,9 @@ def set_check_run_status( check_run: str, status: str = "", conclusion: str = "", - output: Optional[Dict[str, str]] = None, + output: Optional[dict[str, str]] = None, ) -> None: - kwargs: Dict[str, Any] = {"name": check_run, "head_sha": self.last_commit.sha} + kwargs: dict[str, Any] = {"name": check_run, "head_sha": self.last_commit.sha} if status: kwargs["status"] = status @@ -1785,7 +1752,7 @@ def get_story_key_with_jira_connection(self) -> str: return "" return _story_key - def get_branch_required_status_checks(self) -> List[str]: + def get_branch_required_status_checks(self) -> list[str]: if self.repository.private: self.logger.info( f"{self.log_prefix} Repository is private, skipping getting branch protection required status checks" @@ -1796,11 +1763,11 @@ def get_branch_required_status_checks(self) -> List[str]: branch_protection = pull_request_branch.get_protection() return branch_protection.required_status_checks.contexts - def get_all_required_status_checks(self) -> List[str]: + def get_all_required_status_checks(self) -> list[str]: if not hasattr(self, "pull_request_branch"): self.pull_request_branch = self.pull_request.base.ref - all_required_status_checks: List[str] = [] + all_required_status_checks: list[str] = [] branch_required_status_checks = self.get_branch_required_status_checks() if self.tox: all_required_status_checks.append(TOX_STR) @@ -1859,7 +1826,7 @@ def set_jira_in_pull_request(self) -> None: ) def process_cherry_pick_command(self, issue_comment_id: int, command_args: str, reviewed_user: str) -> None: - _target_branches: List[str] = command_args.split() + _target_branches: list[str] = command_args.split() _exits_target_branches: Set[str] = set() _non_exits_target_branches_msg: str = "" @@ -1877,7 +1844,7 @@ def process_cherry_pick_command(self, issue_comment_id: int, command_args: str, if _exits_target_branches: if not self.pull_request.is_merged(): - cp_labels: List[str] = [ + cp_labels: list[str] = [ f"{CHERRY_PICK_LABEL_PREFIX}{_target_branch}" for _target_branch in _exits_target_branches ] info_msg: str = f""" @@ -1896,10 +1863,10 @@ def process_cherry_pick_command(self, issue_comment_id: int, command_args: str, ) def process_retest_command(self, issue_comment_id: int, command_args: str) -> None: - _target_tests: List[str] = command_args.split() - _not_supported_retests: List[str] = [] - _supported_retests: List[str] = [] - _retests_to_func_map: Dict[str, Callable] = { + _target_tests: list[str] = command_args.split() + _not_supported_retests: list[str] = [] + _supported_retests: list[str] = [] + _retests_to_func_map: dict[str, Callable] = { TOX_STR: self._run_tox, PRE_COMMIT_STR: self._run_pre_commit, BUILD_CONTAINER_STR: self._run_build_container, @@ -1939,7 +1906,7 @@ def process_retest_command(self, issue_comment_id: int, command_args: str) -> No self.pull_request.create_issue_comment(msg) if _supported_retests: - _retest_to_exec: List[Future] = [] + _retest_to_exec: list[Future] = [] with ThreadPoolExecutor() as executor: for _test in _supported_retests: _retest_to_exec.append(executor.submit(_retests_to_func_map[_test])) @@ -2072,8 +2039,8 @@ def set_pull_request_automerge(self) -> None: self.logger.error(f"{self.log_prefix} Exception while setting auto merge: {exp}") @property - def _current_pull_request_supported_retest(self) -> List[str]: - current_pull_request_supported_retest: List[str] = [] + def _current_pull_request_supported_retest(self) -> list[str]: + current_pull_request_supported_retest: list[str] = [] if self.tox: current_pull_request_supported_retest.append(TOX_STR) @@ -2344,7 +2311,7 @@ def _add_reviewer_by_user_comment(self, reviewer: str) -> None: self.pull_request.create_issue_comment(_err) def conventional_title_check(self) -> None: - output: Dict[str, str] = { + output: dict[str, str] = { "title": "Conventional Title", "summary": "", "text": "", diff --git a/webhook_server_container/tests/conftest.py b/webhook_server_container/tests/conftest.py index 2a4d476f..a7a560e3 100644 --- a/webhook_server_container/tests/conftest.py +++ b/webhook_server_container/tests/conftest.py @@ -49,6 +49,7 @@ def decoded_content(self): class Repository: def __init__(self): self.name = "test-repo" + self.full_name = "my-org/test-repo" def get_git_tree(self, sha: str, recursive: bool): return Tree("") @@ -104,7 +105,9 @@ def process_github_webhook(mocker, request): mocker.patch(f"{base_import_path}.get_github_repo_api", return_value=Repository()) process_github_webhook = ProcessGithubWehook( - {"repository": {"name": Repository().name}}, Headers({"X-GitHub-Event": "test-event"}), logging.getLogger() + hook_data={"repository": {"name": Repository().name, "full_name": Repository().full_name}}, + headers=Headers({"X-GitHub-Event": "test-event"}), + logger=logging.getLogger(), ) process_github_webhook.pull_request_branch = "main" if hasattr(request, "param") and request.param: diff --git a/webhook_server_container/tests/test_branch_protection.py b/webhook_server_container/tests/test_branch_protection.py index 71160038..029b191d 100644 --- a/webhook_server_container/tests/test_branch_protection.py +++ b/webhook_server_container/tests/test_branch_protection.py @@ -1,26 +1,30 @@ import os + import pytest from webhook_server_container.libs.config import Config from webhook_server_container.utils.github_repository_settings import ( - get_repo_branch_protection_rules, DEFAULT_BRANCH_PROTECTION, + get_repo_branch_protection_rules, ) @pytest.fixture() def branch_protection_rules(request, mocker): os.environ["WEBHOOK_SERVER_DATA_DIR"] = "webhook_server_container/tests/manifests" - config = Config() repo_name = "test-repo" + config = Config(repository=repo_name) data = config.data data.setdefault("branch_protection", request.param.get("global", {})) - data["repositories"][repo_name].setdefault("branch_protection", request.param.get("repo", {})) + data["repositories"][repo_name].setdefault("branch_protection", request.param.get("repo")) mocker.patch( "webhook_server_container.libs.config.Config.data", new_callable=mocker.PropertyMock, return_value=data ) - return get_repo_branch_protection_rules(config_data=config.data, repo_data=config.data["repositories"][repo_name])[ - "branch_protection" - ] + mocker.patch( + "webhook_server_container.libs.config.Config.repository_data", + new_callable=mocker.PropertyMock, + return_value=data["repositories"][repo_name], + ) + return get_repo_branch_protection_rules(config=config) @pytest.mark.parametrize( @@ -92,4 +96,5 @@ def test_branch_protection_setup(branch_protection_rules, expected): for key in expected: if branch_protection_rules[key] != expected[key]: mismatch[key] = f"Expected value for {key}: {expected[key]}, actual: {branch_protection_rules[key]}" + assert not mismatch, f"Following mismatches are found: {mismatch}" diff --git a/webhook_server_container/utils/github_repository_settings.py b/webhook_server_container/utils/github_repository_settings.py index adb8add8..653f3501 100644 --- a/webhook_server_container/utils/github_repository_settings.py +++ b/webhook_server_container/utils/github_repository_settings.py @@ -1,4 +1,5 @@ import contextlib +import copy import os from concurrent.futures import Future, ThreadPoolExecutor, as_completed from copy import deepcopy @@ -28,7 +29,6 @@ get_api_with_highest_rate_limit, get_future_results, get_logger_with_params, - get_value_from_dicts, ) DEFAULT_BRANCH_PROTECTION = { @@ -186,16 +186,11 @@ def set_repository_labels(repository: Repository, api_user: str) -> str: return f"[API user {api_user}] - {repository}: Setting repository labels is done" -def get_repo_branch_protection_rules(config_data: dict[str, Any], repo_data: dict[str, Any]) -> dict[str, Any]: - for rule_name in DEFAULT_BRANCH_PROTECTION: - repo_data.setdefault("branch_protection", {}) - repo_data["branch_protection"][rule_name] = get_value_from_dicts( - primary_dict=repo_data["branch_protection"], - secondary_dict=config_data.get("branch_protection", {}), - key=rule_name, - return_on_none=DEFAULT_BRANCH_PROTECTION[rule_name], - ) - return repo_data +def get_repo_branch_protection_rules(config: Config) -> dict[str, Any]: + branch_protection = copy.deepcopy(DEFAULT_BRANCH_PROTECTION) + repo_branch_protection = config.get_value(value="branch_protection", return_on_none={}) + branch_protection.update(repo_branch_protection) + return branch_protection def set_repositories_settings(config_: Config, apis_dict: dict[str, dict[str, Any]]) -> None: @@ -203,10 +198,8 @@ def set_repositories_settings(config_: Config, apis_dict: dict[str, dict[str, An logger.info("Processing repositories") config_data = config_.data - default_status_checks: List[str] = config_data.get("default-status-checks", []) + [ - CAN_BE_MERGED_STR, - ] - docker: Optional[Dict[str, str]] = config_data.get("docker") + + docker: Dict[str, str] | None = config_data.get("docker") if docker: logger.info("Login in to docker.io") docker_username: str = docker["username"] @@ -216,15 +209,17 @@ def set_repositories_settings(config_: Config, apis_dict: dict[str, dict[str, An futures = [] with ThreadPoolExecutor() as executor: for repo, data in config_data["repositories"].items(): - data = get_repo_branch_protection_rules(config_data=config_data, repo_data=data) + config = Config(repository=repo) + branch_protection = get_repo_branch_protection_rules(config=config) futures.append( executor.submit( set_repository, **{ "data": data, - "default_status_checks": default_status_checks, "apis_dict": apis_dict, "repository_name": repo, + "branch_protection": branch_protection, + "config": config, }, ) ) @@ -233,14 +228,17 @@ def set_repositories_settings(config_: Config, apis_dict: dict[str, dict[str, An def set_repository( - repository_name: str, data: Dict[str, Any], default_status_checks: List[str], apis_dict: dict[str, dict[str, Any]] + repository_name: str, + data: Dict[str, Any], + apis_dict: dict[str, dict[str, Any]], + branch_protection: dict[str, Any], + config: Config, ) -> Tuple[bool, str, Callable]: logger = get_logger_with_params(name="github-repository-settings") full_repository_name: str = data["name"] logger.info(f"Processing repository {full_repository_name}") - protected_branches: Dict[str, Any] = data.get("protected-branches", {}) - repo_branch_protection_rules: Dict[str, Any] = data["branch_protection"] + protected_branches: Dict[str, Any] = config.get_value(value="protected-branches", return_on_none={}) github_api = apis_dict[repository_name].get("api") api_user = apis_dict[repository_name].get("user", "") @@ -271,7 +269,11 @@ def set_repository( if not branch: logger.error(f"[API user {api_user}] - {full_repository_name}: Failed to get branch {branch_name}") continue - + default_status_checks: List[str] = config.get_value( + value="default-status-checks", return_on_none=[] + ) + [ + CAN_BE_MERGED_STR, + ] _default_status_checks = deepcopy(default_status_checks) ( include_status_checks, @@ -294,7 +296,7 @@ def set_repository( "github_api": github_api, "api_user": api_user, }, - **repo_branch_protection_rules, + **branch_protection, ) ) @@ -324,11 +326,12 @@ def set_all_in_progress_check_runs_to_queued(config_: Config, apis_dict: dict[st with ThreadPoolExecutor() as executor: for repo, data in config_.data["repositories"].items(): + config = Config(repository=repo) futures.append( executor.submit( set_repository_check_runs_to_queued, **{ - "config_": config_, + "config_": config, "data": data, "github_api": apis_dict[repo]["api"], "check_runs": check_runs, @@ -402,6 +405,7 @@ def get_repository_github_app_api(config_: Config, repository_name: str) -> Opti def get_repository_api(repository: str) -> tuple[str, github.Github | None, str]: + config = Config(repository=repository) github_api, _, api_user = get_api_with_highest_rate_limit(config=config, repository_name=repository) return repository, github_api, api_user diff --git a/webhook_server_container/utils/helpers.py b/webhook_server_container/utils/helpers.py index 952676d8..43802b3c 100644 --- a/webhook_server_container/utils/helpers.py +++ b/webhook_server_container/utils/helpers.py @@ -38,17 +38,10 @@ def get_value_from_dicts( def get_logger_with_params(name: str, repository_name: str = "") -> Logger: - _config = Config() - config_data = _config.data # Global repositories configuration - repo_data: dict[str, Any] = {} + _config = Config(repository=repository_name) - if repository_name: - repo_data = _config.repository_data(repository_name=repository_name) # Specific repository configuration - - log_level: str = get_value_from_dicts( - primary_dict=repo_data, secondary_dict=config_data, key="log-level", return_on_none="INFO" - ) - log_file: str = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="log-file") + log_level: str = _config.get_value(value="log-level", return_on_none="INFO") + log_file: str = _config.get_value(value="log-file") return get_logger(name=name, filename=log_file, level=log_level, file_max_bytes=1048576 * 50) # 50MB @@ -147,15 +140,9 @@ def run_command( return False, out_decoded, err_decoded -def get_apis_and_tokes_from_config(config: Config, repository_name: str = "") -> list[tuple[github.Github, str]]: +def get_apis_and_tokes_from_config(config: Config) -> list[tuple[github.Github, str]]: apis_and_tokens: list[tuple[github.Github, str]] = [] - - tokens = get_value_from_dicts( - primary_dict=config.repository_data(repository_name=repository_name), - secondary_dict=config.data, - key="github-tokens", - return_on_none=[], - ) + tokens = config.get_value(value="github-tokens") for _token in tokens: apis_and_tokens.append((github.Github(auth=github.Auth.Token(_token)), _token)) @@ -192,7 +179,7 @@ def get_api_with_highest_rate_limit( logger.debug(msg) - apis_and_tokens = get_apis_and_tokes_from_config(config=config, repository_name=repository_name) + apis_and_tokens = get_apis_and_tokes_from_config(config=config) for _api, _token in apis_and_tokens: _api_user = _api.get_user().login rate_limit = _api.get_rate_limit() diff --git a/webhook_server_container/utils/webhook.py b/webhook_server_container/utils/webhook.py index e7224da8..57026cb7 100644 --- a/webhook_server_container/utils/webhook.py +++ b/webhook_server_container/utils/webhook.py @@ -57,18 +57,20 @@ def process_github_webhook( def get_repository_api(repository: str) -> tuple[str, github.Github | None, str]: + config = Config(repository=repository) github_api, _, api_user = get_api_with_highest_rate_limit(config=config, repository_name=repository) return repository, github_api, api_user -def create_webhook(config_: Config) -> None: +def create_webhook() -> None: + config = Config() LOGGER.info("Preparing webhook configuration") - webhook_ip = config_.data["webhook_ip"] + webhook_ip = config.data["webhook_ip"] apis_dict: dict[str, dict[str, Any]] = {} apis: list = [] with ThreadPoolExecutor() as executor: - for repo, data in config_.data["repositories"].items(): + for repo, data in config.data["repositories"].items(): apis.append( executor.submit( get_repository_api, @@ -84,7 +86,7 @@ def create_webhook(config_: Config) -> None: futures = [] with ThreadPoolExecutor() as executor: - for repo, data in config_.data["repositories"].items(): + for repo, data in config.data["repositories"].items(): futures.append( executor.submit( process_github_webhook, @@ -96,5 +98,4 @@ def create_webhook(config_: Config) -> None: if __name__ == "__main__": - config = Config() - create_webhook(config_=config) + create_webhook() From 86bb1752d9b33d418b8dee6892565b35b50c3b4e Mon Sep 17 00:00:00 2001 From: Meni Yakove Date: Sat, 8 Mar 2025 02:11:12 +0200 Subject: [PATCH 2/4] Remove unused function get_value_from_dicts --- webhook_server_container/utils/helpers.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/webhook_server_container/utils/helpers.py b/webhook_server_container/utils/helpers.py index 43802b3c..b8d5f015 100644 --- a/webhook_server_container/utils/helpers.py +++ b/webhook_server_container/utils/helpers.py @@ -16,27 +16,6 @@ from webhook_server_container.libs.config import Config -def get_value_from_dicts( - primary_dict: dict[Any, Any], - secondary_dict: dict[Any, Any], - key: str, - return_on_none: Any = None, -) -> Any: - """ - Get value from two dictionaries. - - If value is not found in primary_dict, try to get it from secondary_dict, otherwise return return_on_none. - """ - if primary_dict.get(key) is not None: - return primary_dict[key] - - elif secondary_dict.get(key) is not None: - return secondary_dict[key] - - else: - return return_on_none - - def get_logger_with_params(name: str, repository_name: str = "") -> Logger: _config = Config(repository=repository_name) From 41f2ffd57a659987021ebaccbde0e06034ef42c8 Mon Sep 17 00:00:00 2001 From: Meni Yakove Date: Sun, 9 Mar 2025 08:39:18 +0200 Subject: [PATCH 3/4] remove <> from test config file --- .../tests/manifests/config.yaml | 44 +++++++++---------- .../tests/test_repo_data_from_config.py | 12 ++--- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/webhook_server_container/tests/manifests/config.yaml b/webhook_server_container/tests/manifests/config.yaml index 93c2cd6f..1538b566 100644 --- a/webhook_server_container/tests/manifests/config.yaml +++ b/webhook_server_container/tests/manifests/config.yaml @@ -3,14 +3,14 @@ log-file: webhook-server.log # Set global log file, change take effect immediate github-app-id: 123456 # GitHub app id github-toekns: - - - - + - GITHIB TOKEN1 + - GITHIB TOKEN2 -webhook_ip: +webhook_ip: HTTP://IP OR URL:PORT docker: # Used to pull images from docker.io - username: - password: + username: username + password: password # pragma: allowlist secret default-status-checks: - "WIP" @@ -22,21 +22,21 @@ auto-verified-and-merged-users: - "pre-commit-ci[bot]" jira: - server: - project: - token: + server: JIRA URL + project: PROJECT KEY + token: JIRA TOKEN user-mapping: - : # if github user is not the same as jira + GITHUB USER: # if github user is not the same as jira repositories: test-repo: name: my-org/test-repo log-level: DEBUG # Override global log-level for repository log-file: test-repo.log # Override global log-file for repository - slack_webhook_url: # Send notification to slack on several operations + slack_webhook_url: Slack webhook url # Send notification to slack on several operations verified_job: true pypi: - token: + token: PYPI TOKEN events: # To listen to all events do not send events - push @@ -60,10 +60,10 @@ repositories: exclude-runs: - "SonarCloud Code Analysis" container: - username: - password: - repository: - tag: + username: registry username + password: registry_password # pragma: allowlist secret + repository: registry_repository_full_path + tag: image_tag release: true # Push image to registry on new release with release as the tag build-args: # build args to send to podman build command - my-build-arg1=1 @@ -75,8 +75,8 @@ repositories: - "my[bot]" github-tokens: # override GitHub tokens per repository - - - - + - GITHUB TOKEN1 + - GITHUB TOKEN2 can-be-merged-required-labels: # check for extra labels to set PR as can be merged - my-label1 @@ -85,9 +85,9 @@ repositories: jira-tracking: true jira: # override Jira global settings - server: - project: - token: - epic: # Optional + server: JIRA URL + project: PROJECT KEY + token: JIRA TOKEN + epic: EPIC KEY # Optional user-mapping: - : # if github user is not the same as jira + GITHUB USER: # if github user is not the same as jira diff --git a/webhook_server_container/tests/test_repo_data_from_config.py b/webhook_server_container/tests/test_repo_data_from_config.py index a7073101..cef45e5a 100644 --- a/webhook_server_container/tests/test_repo_data_from_config.py +++ b/webhook_server_container/tests/test_repo_data_from_config.py @@ -3,15 +3,15 @@ def test_repo_data_from_config_repository_found(process_github_webhook): assert process_github_webhook.repository_full_name == "my-org/test-repo" assert process_github_webhook.github_app_id == 123456 - assert process_github_webhook.pypi == {"token": ""} + assert process_github_webhook.pypi == {"token": "PYPI TOKEN"} assert process_github_webhook.verified_job assert process_github_webhook.tox_python_version == "3.8" - assert process_github_webhook.slack_webhook_url == "" - assert process_github_webhook.container_repository_username == "" - assert process_github_webhook.container_repository_password == "" - assert process_github_webhook.container_repository == "" + assert process_github_webhook.slack_webhook_url == "Slack webhook url" + assert process_github_webhook.container_repository_username == "registry username" + assert process_github_webhook.container_repository_password == "registry_password" # pragma: allowlist secret + assert process_github_webhook.container_repository == "registry_repository_full_path" assert process_github_webhook.dockerfile == "Dockerfile" - assert process_github_webhook.container_tag == "" + assert process_github_webhook.container_tag == "image_tag" assert process_github_webhook.container_build_args == ["my-build-arg1=1", "my-build-arg2=2"] assert process_github_webhook.container_command_args == ["--format docker"] assert process_github_webhook.container_release From 10427cc8ce4aa56072c61b145e75ddb6d4ca3d62 Mon Sep 17 00:00:00 2001 From: Meni Yakove Date: Sun, 9 Mar 2025 08:45:46 +0200 Subject: [PATCH 4/4] Use built=in types --- webhook_server_container/app.py | 14 +++---- webhook_server_container/libs/github_api.py | 20 ++++----- webhook_server_container/libs/jira_api.py | 9 ++-- webhook_server_container/utils/constants.py | 10 ++--- .../utils/github_repository_settings.py | 42 +++++++++---------- webhook_server_container/utils/webhook.py | 12 +++--- 6 files changed, 52 insertions(+), 55 deletions(-) diff --git a/webhook_server_container/app.py b/webhook_server_container/app.py index 024f8002..e0e7609f 100644 --- a/webhook_server_container/app.py +++ b/webhook_server_container/app.py @@ -1,12 +1,10 @@ -from typing import Any, Dict import os import sys +from typing import Any -from fastapi import Request import requests import urllib3 - -from fastapi import FastAPI +from fastapi import FastAPI, Request from webhook_server_container.libs.github_api import ProcessGithubWehook from webhook_server_container.utils.helpers import get_logger_with_params @@ -17,22 +15,22 @@ @FASTAPI_APP.get(f"{APP_URL_ROOT_PATH}/healthcheck") -def healthcheck() -> Dict[str, Any]: +def healthcheck() -> dict[str, Any]: return {"status": requests.codes.ok, "message": "Alive"} @FASTAPI_APP.post(APP_URL_ROOT_PATH) -async def process_webhook(request: Request) -> Dict[str, Any]: +async def process_webhook(request: Request) -> dict[str, Any]: logger_name: str = "main" logger = get_logger_with_params(name=logger_name) delivery_headers = request.headers.get("X-GitHub-Delivery", "") - process_failed_msg: Dict[str, Any] = { + process_failed_msg: dict[str, Any] = { "status": requests.codes.server_error, "message": "Process failed", "log_prefix": delivery_headers, } try: - hook_data: Dict[Any, Any] = await request.json() + hook_data: dict[Any, Any] = await request.json() except Exception as ex: logger.error(f"Error get JSON from request: {ex}") diff --git a/webhook_server_container/libs/github_api.py b/webhook_server_container/libs/github_api.py index 8fc7a559..af55420d 100644 --- a/webhook_server_container/libs/github_api.py +++ b/webhook_server_container/libs/github_api.py @@ -10,7 +10,7 @@ import time from concurrent.futures import Future, ThreadPoolExecutor, as_completed from pathlib import Path -from typing import Any, Callable, Generator, Optional, Set, Tuple +from typing import Any, Callable, Generator from uuid import uuid4 import requests @@ -385,7 +385,7 @@ def _repo_data_from_config(self) -> None: self.jira_server: str = self.jira["server"] self.jira_project: str = self.jira["project"] self.jira_token: str = self.jira["token"] - self.jira_epic: Optional[str] = self.jira.get("epic", "") + self.jira_epic: str | None = self.jira.get("epic", "") self.jira_user_mapping: dict[str, str] = self.jira.get("user-mapping", {}) self.jira_enabled_repository = all([self.jira_server, self.jira_project, self.jira_token]) if not self.jira_enabled_repository: @@ -402,7 +402,7 @@ def _repo_data_from_config(self) -> None: ) self.conventional_title: str = self.config.get_value(value="conventional-title") - def _get_pull_request(self, number: Optional[int] = None) -> PullRequest: + def _get_pull_request(self, number: int | None = None) -> PullRequest: if number: return self.repository.get_pull(number) @@ -672,13 +672,13 @@ def set_run_pre_commit_check_queued(self) -> None: def set_run_pre_commit_check_in_progress(self) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, status=IN_PROGRESS_STR) - def set_run_pre_commit_check_failure(self, output: Optional[dict[str, Any]] = None) -> None: + def set_run_pre_commit_check_failure(self, output: dict[str, Any] | None = None) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, conclusion=FAILURE_STR, output=output) - def set_run_pre_commit_check_success(self, output: Optional[dict[str, Any]] = None) -> None: + def set_run_pre_commit_check_success(self, output: dict[str, Any] | None = None) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, conclusion=SUCCESS_STR, output=output) - def set_merge_check_queued(self, output: Optional[dict[str, Any]] = None) -> None: + def set_merge_check_queued(self, output: dict[str, Any] | None = None) -> None: return self.set_check_run_status(check_run=CAN_BE_MERGED_STR, status=QUEUED_STR, output=output) def set_merge_check_in_progress(self) -> None: @@ -906,7 +906,7 @@ def process_pull_request_webhook_data(self) -> None: if hook_action in ("labeled", "unlabeled"): _check_for_merge: bool = False - _reviewer: Optional[str] = None + _reviewer: str | None = None action_labeled = hook_action == "labeled" labeled = self.hook_data["label"]["name"].lower() if labeled == CAN_BE_MERGED_STR: @@ -1585,7 +1585,7 @@ def set_check_run_status( check_run: str, status: str = "", conclusion: str = "", - output: Optional[dict[str, str]] = None, + output: dict[str, str] | None = None, ) -> None: kwargs: dict[str, Any] = {"name": check_run, "head_sha": self.last_commit.sha} @@ -1827,7 +1827,7 @@ def set_jira_in_pull_request(self) -> None: def process_cherry_pick_command(self, issue_comment_id: int, command_args: str, reviewed_user: str) -> None: _target_branches: list[str] = command_args.split() - _exits_target_branches: Set[str] = set() + _exits_target_branches: set[str] = set() _non_exits_target_branches_msg: str = "" for _target_branch in _target_branches: @@ -2065,7 +2065,7 @@ def fix_podman_bug(self) -> None: shutil.rmtree("/tmp/storage-run-1000/containers", ignore_errors=True) shutil.rmtree("/tmp/storage-run-1000/libpod/tmp", ignore_errors=True) - def run_podman_command(self, command: str, pipe: bool = False) -> Tuple[bool, str, str]: + def run_podman_command(self, command: str, pipe: bool = False) -> tuple[bool, str, str]: rc, out, err = run_command(command=command, log_prefix=self.log_prefix, pipe=pipe) if rc: diff --git a/webhook_server_container/libs/jira_api.py b/webhook_server_container/libs/jira_api.py index 2574c4b5..440931b6 100644 --- a/webhook_server_container/libs/jira_api.py +++ b/webhook_server_container/libs/jira_api.py @@ -1,5 +1,6 @@ -from typing import Any, Dict, List -from jira import Issue, JIRA +from typing import Any + +from jira import JIRA, Issue from webhook_server_container.utils.helpers import get_logger_with_params @@ -16,7 +17,7 @@ def __init__(self, server: str, project: str, token: str): token_auth=self.token, ) self.conn.my_permissions() - self.fields: Dict[str, Any] = {"project": {"key": self.project}} + self.fields: dict[str, Any] = {"project": {"key": self.project}} def create_story(self, title: str, body: str, epic_key: str, assignee: str) -> str: self.fields.update({ @@ -51,5 +52,5 @@ def close_issue(self, key: str, comment: str = "") -> None: ) def get_epic_custom_field(self) -> str: - _epic_field_id: List[str] = [cf["id"] for cf in self.conn.fields() if "Epic Link" in cf["name"]] + _epic_field_id: list[str] = [cf["id"] for cf in self.conn.fields() if "Epic Link" in cf["name"]] return _epic_field_id[0] if _epic_field_id else "" diff --git a/webhook_server_container/utils/constants.py b/webhook_server_container/utils/constants.py index ac8e6713..a30edc37 100644 --- a/webhook_server_container/utils/constants.py +++ b/webhook_server_container/utils/constants.py @@ -1,5 +1,3 @@ -from typing import Dict - OTHER_MAIN_BRANCH: str = "master" TOX_STR: str = "tox" PRE_COMMIT_STR: str = "pre-commit" @@ -36,14 +34,14 @@ COMMAND_ASSIGN_REVIEWER_STR = "assign-reviewer" # Gitlab colors require a '#' prefix; e.g: # -USER_LABELS_DICT: Dict[str, str] = { +USER_LABELS_DICT: dict[str, str] = { HOLD_LABEL_STR: "B60205", VERIFIED_LABEL_STR: "0E8A16", WIP_STR: "B60205", LGTM_STR: "0E8A16", } -STATIC_LABELS_DICT: Dict[str, str] = { +STATIC_LABELS_DICT: dict[str, str] = { **USER_LABELS_DICT, CHERRY_PICKED_LABEL_PREFIX: "1D76DB", f"{SIZE_LABEL_PREFIX}L": "F5621C", @@ -57,7 +55,7 @@ HAS_CONFLICTS_LABEL_STR: "B60205", } -DYNAMIC_LABELS_DICT: Dict[str, str] = { +DYNAMIC_LABELS_DICT: dict[str, str] = { APPROVED_BY_LABEL_PREFIX: "0E8A16", LGTM_BY_LABEL_PREFIX: "DCED6F", COMMENTED_BY_LABEL_PREFIX: "D93F0B", @@ -67,7 +65,7 @@ JIRA_STR: "1D76DB", } -ALL_LABELS_DICT: Dict[str, str] = {**STATIC_LABELS_DICT, **DYNAMIC_LABELS_DICT} +ALL_LABELS_DICT: dict[str, str] = {**STATIC_LABELS_DICT, **DYNAMIC_LABELS_DICT} class REACTIONS: diff --git a/webhook_server_container/utils/github_repository_settings.py b/webhook_server_container/utils/github_repository_settings.py index 653f3501..5ddad40b 100644 --- a/webhook_server_container/utils/github_repository_settings.py +++ b/webhook_server_container/utils/github_repository_settings.py @@ -3,7 +3,7 @@ import os from concurrent.futures import Future, ThreadPoolExecutor, as_completed from copy import deepcopy -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable import github from github import Auth, Github, GithubIntegration @@ -57,7 +57,7 @@ def get_branch_sampler(repo: Repository, branch_name: str) -> Branch: def set_branch_protection( branch: Branch, repository: Repository, - required_status_checks: List[str], + required_status_checks: list[str], github_api: Github, strict: bool, require_code_owner_reviews: bool, @@ -119,10 +119,10 @@ def set_repository_settings(repository: Repository, api_user: str) -> None: def get_required_status_checks( repo: Repository, - data: Dict[str, Any], - default_status_checks: List[str], - exclude_status_checks: List[str], -) -> List[str]: + data: dict[str, Any], + default_status_checks: list[str], + exclude_status_checks: list[str], +) -> list[str]: if data.get("tox"): default_status_checks.append("tox") @@ -149,9 +149,9 @@ def get_required_status_checks( return default_status_checks -def get_user_configures_status_checks(status_checks: Dict[str, Any]) -> Tuple[List[str], List[str]]: - include_status_checks: List[str] = [] - exclude_status_checks: List[str] = [] +def get_user_configures_status_checks(status_checks: dict[str, Any]) -> tuple[list[str], list[str]]: + include_status_checks: list[str] = [] + exclude_status_checks: list[str] = [] if status_checks: include_status_checks = status_checks.get("include-runs", []) exclude_status_checks = status_checks.get("exclude-runs", []) @@ -163,7 +163,7 @@ def set_repository_labels(repository: Repository, api_user: str) -> str: logger = get_logger_with_params(name="github-repository-settings") logger.info(f"[API user {api_user}] - Set repository {repository.name} labels") - repository_labels: Dict[str, Dict[str, Any]] = {} + repository_labels: dict[str, dict[str, Any]] = {} for label in repository.get_labels(): repository_labels[label.name.lower()] = { "object": label, @@ -199,7 +199,7 @@ def set_repositories_settings(config_: Config, apis_dict: dict[str, dict[str, An logger.info("Processing repositories") config_data = config_.data - docker: Dict[str, str] | None = config_data.get("docker") + docker: dict[str, str] | None = config_data.get("docker") if docker: logger.info("Login in to docker.io") docker_username: str = docker["username"] @@ -229,16 +229,16 @@ def set_repositories_settings(config_: Config, apis_dict: dict[str, dict[str, An def set_repository( repository_name: str, - data: Dict[str, Any], + data: dict[str, Any], apis_dict: dict[str, dict[str, Any]], branch_protection: dict[str, Any], config: Config, -) -> Tuple[bool, str, Callable]: +) -> tuple[bool, str, Callable]: logger = get_logger_with_params(name="github-repository-settings") full_repository_name: str = data["name"] logger.info(f"Processing repository {full_repository_name}") - protected_branches: Dict[str, Any] = config.get_value(value="protected-branches", return_on_none={}) + protected_branches: dict[str, Any] = config.get_value(value="protected-branches", return_on_none={}) github_api = apis_dict[repository_name].get("api") api_user = apis_dict[repository_name].get("user", "") @@ -260,7 +260,7 @@ def set_repository( logger.warning, ) - futures: List["Future"] = [] + futures: list["Future"] = [] with ThreadPoolExecutor() as executor: for branch_name, status_checks in protected_branches.items(): @@ -269,7 +269,7 @@ def set_repository( if not branch: logger.error(f"[API user {api_user}] - {full_repository_name}: Failed to get branch {branch_name}") continue - default_status_checks: List[str] = config.get_value( + default_status_checks: list[str] = config.get_value( value="default-status-checks", return_on_none=[] ) + [ CAN_BE_MERGED_STR, @@ -322,7 +322,7 @@ def set_all_in_progress_check_runs_to_queued(config_: Config, apis_dict: dict[st BUILD_CONTAINER_STR, PRE_COMMIT_STR, ) - futures: List["Future"] = [] + futures: list["Future"] = [] with ThreadPoolExecutor() as executor: for repo, data in config_.data["repositories"].items(): @@ -345,11 +345,11 @@ def set_all_in_progress_check_runs_to_queued(config_: Config, apis_dict: dict[st def set_repository_check_runs_to_queued( config_: Config, - data: Dict[str, Any], + data: dict[str, Any], github_api: Github, - check_runs: Tuple[str], + check_runs: tuple[str], api_user: str, -) -> Tuple[bool, str, Callable]: +) -> tuple[bool, str, Callable]: logger = get_logger_with_params(name="github-repository-settings") repository: str = data["name"] @@ -381,7 +381,7 @@ def set_repository_check_runs_to_queued( return True, f"[API user {api_user}] - {repository}: Set check run status to {QUEUED_STR} is done", logger.debug -def get_repository_github_app_api(config_: Config, repository_name: str) -> Optional[Github]: +def get_repository_github_app_api(config_: Config, repository_name: str) -> Github | None: logger = get_logger_with_params(name="github-repository-settings") logger.debug("Getting repositories GitHub app API") diff --git a/webhook_server_container/utils/webhook.py b/webhook_server_container/utils/webhook.py index 57026cb7..ebf80b5b 100644 --- a/webhook_server_container/utils/webhook.py +++ b/webhook_server_container/utils/webhook.py @@ -1,5 +1,5 @@ from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Any, Callable, Dict, List, Tuple +from typing import Any, Callable import github from github.Hook import Hook @@ -16,8 +16,8 @@ def process_github_webhook( - repository_name: str, data: Dict[str, Any], webhook_ip: str, apis_dict: dict[str, dict[str, Any]] -) -> Tuple[bool, str, Callable]: + repository_name: str, data: dict[str, Any], webhook_ip: str, apis_dict: dict[str, dict[str, Any]] +) -> tuple[bool, str, Callable]: full_repository_name: str = data["name"] github_api = apis_dict[repository_name].get("api") api_user = apis_dict[repository_name].get("user") @@ -29,11 +29,11 @@ def process_github_webhook( if not repo: return False, f"[API user {api_user}] - Could not find repository {full_repository_name}", LOGGER.error - config_: Dict[str, str] = {"url": f"{webhook_ip}/webhook_server", "content_type": "json"} - events: List[str] = data.get("events", ["*"]) + config_: dict[str, str] = {"url": f"{webhook_ip}/webhook_server", "content_type": "json"} + events: list[str] = data.get("events", ["*"]) try: - hooks: List[Hook] = list(repo.get_hooks()) + hooks: list[Hook] = list(repo.get_hooks()) except Exception as ex: return ( False,