From fc56af80f420a0089b0fa1a14cc2b393e0319715 Mon Sep 17 00:00:00 2001 From: Meni Yakove Date: Sat, 9 Nov 2024 11:48:19 +0200 Subject: [PATCH 1/2] Add pre-commiy hook: pyupgrade --- .pre-commit-config.yaml | 9 +- webhook_server_container/libs/github_api.py | 148 ++++++++++---------- webhook_server_container/utils/helpers.py | 38 +++-- 3 files changed, 99 insertions(+), 96 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3cd1e2fb..f679f43d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -36,7 +36,7 @@ repos: - id: detect-secrets - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.7.2 + rev: v0.7.3 hooks: - id: ruff - id: ruff-format @@ -59,6 +59,11 @@ repos: additional_dependencies: [types-requests, types-PyYAML] - repo: https://github.com/renovatebot/pre-commit-hooks - rev: 39.0.0 + rev: 39.8.0 hooks: - id: renovate-config-validator + + - repo: https://github.com/asottile/pyupgrade + rev: v3.19.0 + hooks: + - id: pyupgrade diff --git a/webhook_server_container/libs/github_api.py b/webhook_server_container/libs/github_api.py index 5fd3cbb5..a2aded53 100644 --- a/webhook_server_container/libs/github_api.py +++ b/webhook_server_container/libs/github_api.py @@ -10,7 +10,7 @@ import time from concurrent.futures import Future, ThreadPoolExecutor, as_completed from pathlib import Path -from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple +from typing import Any, Callable, Generator from stringcolor import cs from github.Branch import Branch @@ -87,7 +87,7 @@ class RepositoryNotFoundError(Exception): class ProcessGithubWehookError(Exception): - def __init__(self, err: Dict[str, str]): + def __init__(self, err: dict[str, str]): self.err = err def __str__(self) -> str: @@ -95,7 +95,7 @@ def __str__(self) -> str: class ProcessGithubWehook: - def __init__(self, hook_data: Dict[Any, Any], headers: Headers, logger: logging.Logger) -> None: + def __init__(self, hook_data: dict[Any, Any], headers: Headers, logger: logging.Logger) -> None: self.logger = logger self.logger.name = "ProcessGithubWehook" self.hook_data = hook_data @@ -104,10 +104,10 @@ def __init__(self, hook_data: Dict[Any, Any], headers: Headers, logger: logging. self.parent_committer: str = "" self.jira_track_pr: bool = False self.issue_title: str = "" - self.all_required_status_checks: List[str] = [] + self.all_required_status_checks: list[str] = [] self.x_github_delivery: str = self.headers.get("X-GitHub-Delivery", "") self.github_event: str = self.headers["X-GitHub-Event"] - self.owners_content: Dict[str, Any] = {} + self.owners_content: dict[str, Any] = {} self.config = Config() self.log_prefix = self.prepare_log_prefix() @@ -252,7 +252,7 @@ def add_api_users_to_auto_verified_and_merged_users(self) -> None: self.auto_verified_and_merged_users.extend([_api[0].get_user().login for _api in apis_and_tokens]) def _get_reposiroty_color_for_log_prefix(self) -> str: - def _get_random_color(_colors: List[str], _json: Dict[str, str]) -> str: + def _get_random_color(_colors: list[str], _json: dict[str, str]) -> str: color = random.choice(_colors) _json[self.repository_name] = color @@ -261,8 +261,8 @@ def _get_random_color(_colors: List[str], _json: Dict[str, str]) -> str: return self.repository_name - _all_colors: List[str] = [] - color_json: Dict[str, str] + _all_colors: list[str] = [] + color_json: dict[str, str] _colors_to_exclude = ("blue", "white", "black", "grey") color_file: str = os.path.join(self.config.data_dir, "log-colors.json") @@ -300,7 +300,7 @@ def _get_random_color(_colors: List[str], _json: Dict[str, str]) -> str: return self.repository_name - def prepare_log_prefix(self, pull_request: Optional[PullRequest] = None) -> str: + def prepare_log_prefix(self, pull_request: PullRequest | None = None) -> str: _repository_color = self._get_reposiroty_color_for_log_prefix() _id = self.x_github_delivery.split("-", 1)[-1] return ( @@ -310,7 +310,7 @@ def prepare_log_prefix(self, pull_request: Optional[PullRequest] = None) -> str: ) def process_pull_request_check_run_webhook_data(self) -> None: - _check_run: Dict[str, Any] = self.hook_data["check_run"] + _check_run: dict[str, Any] = self.hook_data["check_run"] if _check_run["action"] != "completed": self.logger.debug(f"{self.log_prefix} check run action is not completed, skipping") return @@ -354,14 +354,14 @@ def _repo_data_from_config(self) -> None: self.github_app_id: str = get_value_from_dicts( primary_dict=repo_data, secondary_dict=config_data, key="github-app-id" ) - self.pypi: Dict[str, str] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="pypi") + self.pypi: dict[str, str] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="pypi") self.verified_job: bool = get_value_from_dicts( primary_dict=repo_data, secondary_dict=config_data, key="verified-job", return_on_none=True, ) - self.tox: Dict[str, str] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="tox") + self.tox: dict[str, str] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="tox") self.tox_python_version: str = get_value_from_dicts( primary_dict=repo_data, secondary_dict=config_data, @@ -371,7 +371,7 @@ def _repo_data_from_config(self) -> None: self.slack_webhook_url: str = get_value_from_dicts( primary_dict=repo_data, secondary_dict=config_data, key="slack_webhook_url" ) - self.build_and_push_container: Dict[str, Any] = repo_data.get("container", {}) + self.build_and_push_container: dict[str, Any] = repo_data.get("container", {}) if self.build_and_push_container: self.container_repository_username: str = self.build_and_push_container["username"] self.container_repository_password: str = self.build_and_push_container["password"] @@ -393,13 +393,13 @@ def _repo_data_from_config(self) -> None: self.jira_tracking: bool = get_value_from_dicts( primary_dict=repo_data, secondary_dict=config_data, key="jira-tracking" ) - self.jira: Dict[str, Any] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="jira") + self.jira: dict[str, Any] = get_value_from_dicts(primary_dict=repo_data, secondary_dict=config_data, key="jira") if self.jira_tracking and self.jira: self.jira_server: str = self.jira["server"] self.jira_project: str = self.jira["project"] self.jira_token: str = self.jira["token"] - self.jira_epic: Optional[str] = self.jira.get("epic", "") - self.jira_user_mapping: Dict[str, str] = self.jira.get("user-mapping", {}) + self.jira_epic: str | None = self.jira.get("epic", "") + self.jira_user_mapping: dict[str, str] = self.jira.get("user-mapping", {}) self.jira_enabled_repository = all([self.jira_server, self.jira_project, self.jira_token]) if not self.jira_enabled_repository: self.logger.error( @@ -407,7 +407,7 @@ def _repo_data_from_config(self) -> None: f"Project: {self.jira_project}, Token: {self.jira_token}" ) - self.auto_verified_and_merged_users: List[str] = get_value_from_dicts( + self.auto_verified_and_merged_users: list[str] = get_value_from_dicts( primary_dict=repo_data, secondary_dict=config_data, key="auto-verified-and-merged-users", @@ -420,7 +420,7 @@ def _repo_data_from_config(self) -> None: return_on_none=[], ) - def _get_pull_request(self, number: Optional[int] = None) -> PullRequest: + def _get_pull_request(self, number: int | None = None) -> PullRequest: if number: return self.repository.get_pull(number) @@ -430,7 +430,7 @@ def _get_pull_request(self, number: Optional[int] = None) -> PullRequest: except GithubException: continue - commit: Dict[str, Any] = self.hook_data.get("commit", {}) + commit: dict[str, Any] = self.hook_data.get("commit", {}) if commit: commit_obj = self.repository.get_commit(commit["sha"]) with contextlib.suppress(Exception): @@ -444,7 +444,7 @@ def _get_last_commit(self) -> Commit: def label_exists_in_pull_request(self, label: str) -> bool: return any(lb for lb in self.pull_request_labels_names() if lb == label) - def pull_request_labels_names(self) -> List[str]: + def pull_request_labels_names(self) -> list[str]: return [lb.name for lb in self.pull_request.labels] if self.pull_request else [] def skip_if_pull_request_already_merged(self) -> bool: @@ -557,7 +557,7 @@ def _error(_out: str, _err: str) -> None: tar_gz_file = tar_gz_file.strip() - commands: List[str] = [ + commands: list[str] = [ f"uvx {uv_cmd_dir} twine check {_dist_dir}/{tar_gz_file}", f"uvx {uv_cmd_dir} twine upload --username __token__ --password {self.pypi['token']} {_dist_dir}/{tar_gz_file} --skip-existing", ] @@ -575,14 +575,14 @@ def _error(_out: str, _err: str) -> None: """ self.send_slack_message(message=message, webhook_url=self.slack_webhook_url) - def get_owners_content(self) -> Dict[str, Any]: + def get_owners_content(self) -> dict[str, Any]: try: owners_content: list[ContentFile] | ContentFile = self.repository.get_contents("OWNERS") if isinstance(owners_content, list): self.logger.debug(f"{self.log_prefix} Found more then one OWNERS file, using the first one") owners_content = owners_content[0] - _content: Dict[str, Any] = yaml.safe_load(owners_content.decoded_content) + _content: dict[str, Any] = yaml.safe_load(owners_content.decoded_content) self.logger.debug(f"{self.log_prefix} OWNERS file content: {_content}") return _content @@ -591,10 +591,10 @@ def get_owners_content(self) -> Dict[str, Any]: return {} @property - def reviewers(self) -> List[str]: - bc_reviewers: List[str] = self.owners_content.get("reviewers", []) + def reviewers(self) -> list[str]: + bc_reviewers: list[str] = self.owners_content.get("reviewers", []) if isinstance(bc_reviewers, dict): - _reviewers: List[str] = self.owners_content.get("reviewers", {}).get("any", []) + _reviewers: list[str] = self.owners_content.get("reviewers", {}).get("any", []) else: _reviewers = bc_reviewers @@ -602,7 +602,7 @@ def reviewers(self) -> List[str]: return _reviewers @property - def files_reviewers(self) -> Dict[str, str]: + def files_reviewers(self) -> dict[str, str]: _reviewers = self.owners_content.get("reviewers", {}) if isinstance(_reviewers, dict): return _reviewers.get("files", {}) @@ -610,7 +610,7 @@ def files_reviewers(self) -> Dict[str, str]: return {} @property - def folders_reviewers(self) -> Dict[str, str]: + def folders_reviewers(self) -> dict[str, str]: _reviewers = self.owners_content.get("reviewers", {}) if isinstance(_reviewers, dict): return _reviewers.get("folders", {}) @@ -618,7 +618,7 @@ def folders_reviewers(self) -> Dict[str, str]: return {} @property - def approvers(self) -> List[str]: + def approvers(self) -> list[str]: return self.owners_content.get("approvers", []) def list_changed_commit_files(self) -> list[str]: @@ -636,7 +636,7 @@ def assign_reviewers(self) -> None: if any(cf for cf in changed_files if _folder in str(Path(cf).parent)): reviewers_to_add.extend(_folder_reviewers) - _to_add: List[str] = list(set(reviewers_to_add)) + _to_add: list[str] = list(set(reviewers_to_add)) self.logger.debug(f"{self.log_prefix} Reviewers to add: {', '.join(_to_add)}") for reviewer in _to_add: if reviewer != self.pull_request.user.login: @@ -715,10 +715,10 @@ def set_run_tox_check_queued(self) -> None: def set_run_tox_check_in_progress(self) -> None: return self.set_check_run_status(check_run=TOX_STR, status=IN_PROGRESS_STR) - def set_run_tox_check_failure(self, output: Dict[str, Any]) -> None: + def set_run_tox_check_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=TOX_STR, conclusion=FAILURE_STR, output=output) - def set_run_tox_check_success(self, output: Dict[str, Any]) -> None: + def set_run_tox_check_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=TOX_STR, conclusion=SUCCESS_STR, output=output) def set_run_pre_commit_check_queued(self) -> None: @@ -730,13 +730,13 @@ def set_run_pre_commit_check_queued(self) -> None: def set_run_pre_commit_check_in_progress(self) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, status=IN_PROGRESS_STR) - def set_run_pre_commit_check_failure(self, output: Optional[Dict[str, Any]] = None) -> None: + def set_run_pre_commit_check_failure(self, output: dict[str, Any] | None = None) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, conclusion=FAILURE_STR, output=output) - def set_run_pre_commit_check_success(self, output: Optional[Dict[str, Any]] = None) -> None: + def set_run_pre_commit_check_success(self, output: dict[str, Any] | None = None) -> None: return self.set_check_run_status(check_run=PRE_COMMIT_STR, conclusion=SUCCESS_STR, output=output) - def set_merge_check_queued(self, output: Optional[Dict[str, Any]] = None) -> None: + def set_merge_check_queued(self, output: dict[str, Any] | None = None) -> None: return self.set_check_run_status(check_run=CAN_BE_MERGED_STR, status=QUEUED_STR, output=output) def set_merge_check_in_progress(self) -> None: @@ -745,7 +745,7 @@ def set_merge_check_in_progress(self) -> None: def set_merge_check_success(self) -> None: return self.set_check_run_status(check_run=CAN_BE_MERGED_STR, conclusion=SUCCESS_STR) - def set_merge_check_failure(self, output: Dict[str, Any]) -> None: + def set_merge_check_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CAN_BE_MERGED_STR, conclusion=FAILURE_STR, output=output) def set_container_build_queued(self) -> None: @@ -757,10 +757,10 @@ def set_container_build_queued(self) -> None: def set_container_build_in_progress(self) -> None: return self.set_check_run_status(check_run=BUILD_CONTAINER_STR, status=IN_PROGRESS_STR) - def set_container_build_success(self, output: Dict[str, Any]) -> None: + def set_container_build_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=BUILD_CONTAINER_STR, conclusion=SUCCESS_STR, output=output) - def set_container_build_failure(self, output: Dict[str, Any]) -> None: + def set_container_build_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=BUILD_CONTAINER_STR, conclusion=FAILURE_STR, output=output) def set_python_module_install_queued(self) -> None: @@ -772,19 +772,19 @@ def set_python_module_install_queued(self) -> None: def set_python_module_install_in_progress(self) -> None: return self.set_check_run_status(check_run=PYTHON_MODULE_INSTALL_STR, status=IN_PROGRESS_STR) - def set_python_module_install_success(self, output: Dict[str, Any]) -> None: + def set_python_module_install_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=PYTHON_MODULE_INSTALL_STR, conclusion=SUCCESS_STR, output=output) - def set_python_module_install_failure(self, output: Dict[str, Any]) -> None: + def set_python_module_install_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=PYTHON_MODULE_INSTALL_STR, conclusion=FAILURE_STR, output=output) def set_cherry_pick_in_progress(self) -> None: return self.set_check_run_status(check_run=CHERRY_PICKED_LABEL_PREFIX, status=IN_PROGRESS_STR) - def set_cherry_pick_success(self, output: Dict[str, Any]) -> None: + def set_cherry_pick_success(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CHERRY_PICKED_LABEL_PREFIX, conclusion=SUCCESS_STR, output=output) - def set_cherry_pick_failure(self, output: Dict[str, Any]) -> None: + def set_cherry_pick_failure(self, output: dict[str, Any]) -> None: return self.set_check_run_status(check_run=CHERRY_PICKED_LABEL_PREFIX, conclusion=FAILURE_STR, output=output) def create_issue_for_new_pull_request(self) -> None: @@ -870,7 +870,7 @@ def process_comment_webhook_data(self) -> None: ) return - _user_commands: List[str] = [_cmd.strip("/") for _cmd in body.strip().splitlines() if _cmd.startswith("/")] + _user_commands: list[str] = [_cmd.strip("/") for _cmd in body.strip().splitlines() if _cmd.startswith("/")] user_login: str = self.hook_data["sender"]["login"] for user_command in _user_commands: @@ -884,7 +884,7 @@ def process_pull_request_webhook_data(self) -> None: hook_action: str = self.hook_data["action"] self.logger.info(f"{self.log_prefix} hook_action is: {hook_action}") - pull_request_data: Dict[str, Any] = self.hook_data["pull_request"] + pull_request_data: dict[str, Any] = self.hook_data["pull_request"] self.parent_committer = pull_request_data["user"]["login"] self.pull_request_branch = pull_request_data["base"]["ref"] @@ -893,7 +893,7 @@ def process_pull_request_webhook_data(self) -> None: if hook_action == "opened": self.logger.info(f"{self.log_prefix} Creating welcome comment") - pull_request_opened_futures: List[Future] = [] + pull_request_opened_futures: list[Future] = [] with ThreadPoolExecutor() as executor: pull_request_opened_futures.append( executor.submit(self.pull_request.create_issue_comment, **{"body": self.welcome_msg}) @@ -909,7 +909,7 @@ def process_pull_request_webhook_data(self) -> None: pass if hook_action == "synchronize": - pull_request_synchronize_futures: List[Future] = [] + pull_request_synchronize_futures: list[Future] = [] with ThreadPoolExecutor() as executor: pull_request_synchronize_futures.append(executor.submit(self.remove_labels_when_pull_request_sync)) pull_request_synchronize_futures.append( @@ -946,7 +946,7 @@ def process_pull_request_webhook_data(self) -> None: if hook_action in ("labeled", "unlabeled"): _check_for_merge: bool = False - _reviewer: Optional[str] = None + _reviewer: str | None = None action_labeled = hook_action == "labeled" labeled = self.hook_data["label"]["name"].lower() if labeled == CAN_BE_MERGED_STR: @@ -959,7 +959,7 @@ def process_pull_request_webhook_data(self) -> None: if labeled.startswith(CHANGED_REQUESTED_BY_LABEL_PREFIX): _reviewer = labeled.split(CHANGED_REQUESTED_BY_LABEL_PREFIX)[-1] - _approved_output: Dict[str, Any] = {"title": "Approved", "summary": "", "text": ""} + _approved_output: dict[str, Any] = {"title": "Approved", "summary": "", "text": ""} if _reviewer in self.approvers: _check_for_merge = True _approved_output["text"] += f"Approved by {_reviewer}.\n" @@ -1074,7 +1074,7 @@ def _run_tox(self) -> None: with self._prepare_cloned_repo_dir(clone_repo_dir=clone_repo_dir): rc, out, err = run_command(command=cmd, log_prefix=self.log_prefix) - output: Dict[str, Any] = { + output: dict[str, Any] = { "title": "Tox", "summary": "", "text": self.get_check_run_text(err=err, out=out), @@ -1098,7 +1098,7 @@ def _run_pre_commit(self) -> None: with self._prepare_cloned_repo_dir(clone_repo_dir=clone_repo_dir): rc, out, err = run_command(command=cmd, log_prefix=self.log_prefix) - output: Dict[str, Any] = { + output: dict[str, Any] = { "title": "Pre-Commit", "summary": "", "text": self.get_check_run_text(err=err, out=out), @@ -1109,7 +1109,7 @@ def _run_pre_commit(self) -> None: return self.set_run_pre_commit_check_failure(output=output) def user_commands(self, command: str, reviewed_user: str, issue_comment_id: int) -> None: - available_commands: List[str] = [ + available_commands: list[str] = [ COMMAND_RETEST_STR, COMMAND_CHERRY_PICK_STR, COMMAND_ASSIGN_REVIEWERS_STR, @@ -1117,7 +1117,7 @@ def user_commands(self, command: str, reviewed_user: str, issue_comment_id: int) BUILD_AND_PUSH_CONTAINER_STR, ] - command_and_args: List[str] = command.split(" ", 1) + command_and_args: list[str] = command.split(" ", 1) _command = command_and_args[0] _args: str = command_and_args[1] if len(command_and_args) > 1 else "" @@ -1222,7 +1222,7 @@ def cherry_pick(self, target_branch: str, reviewed_user: str = "") -> None: clone_repo_dir = f"{self.clone_repo_dir}-{uuid4()}" git_cmd = f"git --work-tree={clone_repo_dir} --git-dir={clone_repo_dir}/.git" hub_cmd = f"GITHUB_TOKEN={self.token} hub --work-tree={clone_repo_dir} --git-dir={clone_repo_dir}/.git" - commands: List[str] = [ + commands: list[str] = [ f"{git_cmd} checkout {target_branch}", f"{git_cmd} pull origin {target_branch}", f"{git_cmd} checkout -b {new_branch_name} origin/{target_branch}", @@ -1498,7 +1498,7 @@ def _run_build_container( clone_repo_dir=clone_repo_dir, ): build_rc, build_out, build_err = self.run_podman_command(command=podman_build_cmd) - output: Dict[str, str] = { + output: dict[str, str] = { "title": "Build container", "summary": "", "text": self.get_check_run_text(err=build_err, out=build_out), @@ -1562,7 +1562,7 @@ def _run_install_python_module(self) -> None: log_prefix=self.log_prefix, ) - output: Dict[str, str] = { + output: dict[str, str] = { "title": "Python module installation", "summary": "", "text": self.get_check_run_text(err=err, out=out), @@ -1573,7 +1573,7 @@ def _run_install_python_module(self) -> None: return self.set_python_module_install_failure(output=output) def send_slack_message(self, message: str, webhook_url: str) -> None: - slack_data: Dict[str, str] = {"text": message} + slack_data: dict[str, str] = {"text": message} self.logger.info(f"{self.log_prefix} Sending message to slack: {message}") response: requests.Response = requests.post( webhook_url, @@ -1607,7 +1607,7 @@ def create_comment_reaction(self, issue_comment_id: int, reaction: str) -> None: _comment.create_reaction(reaction) def process_opened_or_synchronize_pull_request(self) -> None: - prepare_pull_futures: List[Future] = [] + prepare_pull_futures: list[Future] = [] with ThreadPoolExecutor() as executor: prepare_pull_futures.append(executor.submit(self.assign_reviewers)) prepare_pull_futures.append( @@ -1646,9 +1646,9 @@ def set_check_run_status( check_run: str, status: str = "", conclusion: str = "", - output: Optional[Dict[str, str]] = None, + output: dict[str, str] | None = None, ) -> None: - kwargs: Dict[str, Any] = {"name": check_run, "head_sha": self.last_commit.sha} + kwargs: dict[str, Any] = {"name": check_run, "head_sha": self.last_commit.sha} if status: kwargs["status"] = status @@ -1679,7 +1679,7 @@ def _prepare_cloned_repo_dir( is_merged: bool = False, checkout: str = "", tag_name: str = "", - ) -> Generator[None, None, None]: + ) -> Generator[None]: git_cmd = f"git --work-tree={clone_repo_dir} --git-dir={clone_repo_dir}/.git" # Clone the repository @@ -1756,7 +1756,7 @@ def get_story_key_with_jira_connection(self) -> str: return "" return _story_key - def get_branch_required_status_checks(self) -> List[str]: + def get_branch_required_status_checks(self) -> list[str]: if self.repository.private: self.logger.info( f"{self.log_prefix} Repository is private, skipping getting branch protection required status checks" @@ -1767,11 +1767,11 @@ def get_branch_required_status_checks(self) -> List[str]: branch_protection = pull_request_branch.get_protection() return branch_protection.required_status_checks.contexts - def get_all_required_status_checks(self) -> List[str]: + def get_all_required_status_checks(self) -> list[str]: if not hasattr(self, "pull_request_branch"): self.pull_request_branch = self.pull_request.base.ref - all_required_status_checks: List[str] = [] + all_required_status_checks: list[str] = [] branch_required_status_checks = self.get_branch_required_status_checks() if self.tox: all_required_status_checks.append(TOX_STR) @@ -1827,8 +1827,8 @@ def set_jira_in_pull_request(self) -> None: def process_cherry_pick_command(self, issue_comment_id: int, command_args: str, reviewed_user: str) -> None: self.create_comment_reaction(issue_comment_id=issue_comment_id, reaction=REACTIONS.ok) - _target_branches: List[str] = command_args.split() - _exits_target_branches: Set[str] = set() + _target_branches: list[str] = command_args.split() + _exits_target_branches: set[str] = set() _non_exits_target_branches_msg: str = "" for _target_branch in _target_branches: @@ -1845,7 +1845,7 @@ def process_cherry_pick_command(self, issue_comment_id: int, command_args: str, if _exits_target_branches: if not self.pull_request.is_merged(): - cp_labels: List[str] = [ + cp_labels: list[str] = [ f"{CHERRY_PICK_LABEL_PREFIX}{_target_branch}" for _target_branch in _exits_target_branches ] info_msg: str = f""" @@ -1864,10 +1864,10 @@ def process_cherry_pick_command(self, issue_comment_id: int, command_args: str, ) def process_retest_command(self, issue_comment_id: int, command_args: str) -> None: - _target_tests: List[str] = command_args.split() - _not_supported_retests: List[str] = [] - _supported_retests: List[str] = [] - _retests_to_func_map: Dict[str, Callable] = { + _target_tests: list[str] = command_args.split() + _not_supported_retests: list[str] = [] + _supported_retests: list[str] = [] + _retests_to_func_map: dict[str, Callable] = { TOX_STR: self._run_tox, PRE_COMMIT_STR: self._run_pre_commit, BUILD_CONTAINER_STR: self._run_build_container, @@ -1909,7 +1909,7 @@ def process_retest_command(self, issue_comment_id: int, command_args: str) -> No if _supported_retests: self.create_comment_reaction(issue_comment_id=issue_comment_id, reaction=REACTIONS.ok) - _retest_to_exec: List[Future] = [] + _retest_to_exec: list[Future] = [] with ThreadPoolExecutor() as executor: for _test in _supported_retests: _retest_to_exec.append(executor.submit(_retests_to_func_map[_test])) @@ -2039,8 +2039,8 @@ def set_pull_request_automerge(self) -> None: self.logger.error(f"{self.log_prefix} Exception while setting auto merge: {exp}") @property - def _current_pull_request_supported_retest(self) -> List[str]: - current_pull_request_supported_retest: List[str] = [] + def _current_pull_request_supported_retest(self) -> list[str]: + current_pull_request_supported_retest: list[str] = [] if self.tox: current_pull_request_supported_retest.append(TOX_STR) @@ -2065,7 +2065,7 @@ def fix_podman_bug(self) -> None: shutil.rmtree("/tmp/storage-run-1000/containers", ignore_errors=True) shutil.rmtree("/tmp/storage-run-1000/libpod/tmp", ignore_errors=True) - def run_podman_command(self, command: str) -> Tuple[bool, str, str]: + def run_podman_command(self, command: str) -> tuple[bool, str, str]: rc, out, err = run_command(command=command, log_prefix=self.log_prefix) if rc: return rc, out, err diff --git a/webhook_server_container/utils/helpers.py b/webhook_server_container/utils/helpers.py index 4a2f0833..f974aace 100644 --- a/webhook_server_container/utils/helpers.py +++ b/webhook_server_container/utils/helpers.py @@ -4,7 +4,7 @@ import shlex import subprocess from concurrent.futures import Future, as_completed -from typing import Any, Dict, List, Optional, Tuple +from typing import Any from colorama import Fore from github import Github from github.RateLimit import RateLimit @@ -15,10 +15,10 @@ def get_value_from_dicts( - primary_dict: Dict[Any, Any], - secondary_dict: Dict[Any, Any], + primary_dict: dict[Any, Any], + secondary_dict: dict[Any, Any], key: str, - return_on_none: Optional[Any] = None, + return_on_none: Any | None = None, ) -> Any: """ Get value from two dictionaries. @@ -28,10 +28,10 @@ def get_value_from_dicts( return primary_dict.get(key, secondary_dict.get(key, return_on_none)) -def get_logger_with_params(name: str, repository_name: Optional[str] = "") -> Logger: +def get_logger_with_params(name: str, repository_name: str | None = "") -> Logger: _config = Config() config_data = _config.data # Global repositories configuration - repo_data: Dict[str, Any] = {} + repo_data: dict[str, Any] = {} if repository_name: repo_data = _config.repository_data(repository_name=repository_name) # Specific repository configuration @@ -43,18 +43,16 @@ def get_logger_with_params(name: str, repository_name: Optional[str] = "") -> Lo return get_logger(name=name, filename=log_file, level=log_level, file_max_bytes=1048576 * 50) # 50MB -def extract_key_from_dict(key: Any, _dict: Dict[Any, Any]) -> Any: +def extract_key_from_dict(key: Any, _dict: dict[Any, Any]) -> Any: if isinstance(_dict, dict): for _key, _val in _dict.items(): if _key == key: yield _val if isinstance(_val, dict): - for result in extract_key_from_dict(key, _val): - yield result + yield from extract_key_from_dict(key, _val) elif isinstance(_val, list): for _item in _val: - for result in extract_key_from_dict(key, _item): - yield result + yield from extract_key_from_dict(key, _item) def get_github_repo_api(github_api: Github, repository: int | str) -> Repository: @@ -66,11 +64,11 @@ def run_command( log_prefix: str, verify_stderr: bool = False, shell: bool = False, - timeout: Optional[int] = None, + timeout: int | None = None, capture_output: bool = True, check: bool = False, **kwargs: Any, -) -> Tuple[bool, str, str]: +) -> tuple[bool, str, str]: """ Run command locally. @@ -125,8 +123,8 @@ def run_command( return False, out_decoded, err_decoded -def get_apis_and_tokes_from_config(config: Config, repository_name: str = "") -> List[Tuple[Github, str]]: - apis_and_tokens: List[Tuple[Github, str]] = [] +def get_apis_and_tokes_from_config(config: Config, repository_name: str = "") -> list[tuple[Github, str]]: + apis_and_tokens: list[tuple[Github, str]] = [] tokens = get_value_from_dicts( primary_dict=config.repository_data(repository_name=repository_name), @@ -141,7 +139,7 @@ def get_apis_and_tokes_from_config(config: Config, repository_name: str = "") -> return apis_and_tokens -def get_api_with_highest_rate_limit(config: Config, repository_name: str = "") -> Tuple[Github | None, str | None]: +def get_api_with_highest_rate_limit(config: Config, repository_name: str = "") -> tuple[Github | None, str | None]: """ Get API with the highest rate limit @@ -154,10 +152,10 @@ def get_api_with_highest_rate_limit(config: Config, repository_name: str = "") - """ logger = get_logger_with_params(name="helpers") - api: Optional[Github] = None - token: Optional[str] = None + api: Github | None = None + token: str | None = None _api_user: str = "" - rate_limit: Optional[RateLimit] = None + rate_limit: RateLimit | None = None remaining = 0 @@ -203,7 +201,7 @@ def log_rate_limit(rate_limit: RateLimit, api_user: str) -> None: logger.warning(msg) -def get_future_results(futures: List["Future"]) -> None: +def get_future_results(futures: list[Future]) -> None: """ result must return Tuple[bool, str, Callable] when the Callable is Logger function (LOGGER.info, LOGGER.error, etc) """ From 05b1ba6daa40a6478a2d0732926257db0705f2d7 Mon Sep 17 00:00:00 2001 From: Meni Yakove Date: Sat, 9 Nov 2024 12:05:29 +0200 Subject: [PATCH 2/2] Dockerfile: get uv from pip --- Dockerfile | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 95ebf08f..c0e82a12 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,7 +5,6 @@ EXPOSE 5000 ENV USERNAME="podman" ENV HOME_DIR="/home/$USERNAME" ENV BIN_DIR="$HOME_DIR/.local/bin" -ENV UV_INSTALL_DIR="$HOME_DIR/.local" ENV PATH="$PATH:$BIN_DIR" ENV DATA_DIR="$HOME_DIR/data" ENV APP_DIR="$HOME_DIR/github-webhook-server" @@ -36,10 +35,7 @@ RUN usermod --add-subuids 100000-165535 --add-subgids 100000-165535 $USERNAME \ USER $USERNAME WORKDIR $HOME_DIR -# Download the latest uv installer -RUN curl -sSL https://astral.sh/uv/install.sh -o /tmp/uv-installer.sh \ - && sh /tmp/uv-installer.sh \ - && rm /tmp/uv-installer.sh +RUN python3 -m pip install uv RUN set -x \ && curl https://mirror.openshift.com/pub/openshift-v4/clients/rosa/latest/rosa-linux.tar.gz --output $BIN_DIR/rosa-linux.tar.gz \