Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 46 additions & 75 deletions webhook_server_container/libs/github_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from stringcolor import cs

from github.Branch import Branch
from github.ContentFile import ContentFile
import requests
import shortuuid
from starlette.datastructures import Headers
Expand Down Expand Up @@ -203,7 +202,7 @@ def process(self) -> None:
self.last_committer = getattr(self.last_commit.committer, "login", self.parent_committer)
self.changed_files = self.list_changed_commit_files()
self.pull_request_branch = self.pull_request.base.ref
self.approvers_and_reviewers = self.get_approvers_and_reviewers()
self.all_approvers_and_reviewers = self.get_all_approvers_and_reviewers()
self.all_approvers = self.get_all_approvers()
self.all_reviewers = self.get_all_reviewers()

Expand Down Expand Up @@ -581,43 +580,15 @@ def _error(_out: str, _err: str) -> None:
"""
self.send_slack_message(message=message, webhook_url=self.slack_webhook_url)

def get_owners_content(self, folder_path: str = "") -> Dict[str, Any]:
if folder_path:
# Normalize path and check for directory traversal
norm_path = os.path.normpath(folder_path)
if (
norm_path.startswith("/")
or norm_path.startswith("\\")
or ".." in norm_path
or not all(part.isalnum() or part in "-_" for part in norm_path.split(os.path.sep))
):
self.logger.error(f"{self.log_prefix} Invalid folder path: {folder_path}")
return {}

try:
owners_path = f"{folder_path}/OWNERS" if folder_path else "OWNERS"
owners_content: list[ContentFile] | ContentFile = self.repository.get_contents(owners_path)
if isinstance(owners_content, list):
self.logger.debug(f"{self.log_prefix} Found more than one OWNERS file, using the first one")
owners_content = owners_content[0]

_content: Dict[str, Any] = yaml.safe_load(owners_content.decoded_content)
self.logger.debug(f"{self.log_prefix} OWNERS file content: {_content}")
return _content

except UnknownObjectException:
self.logger.error(f"{self.log_prefix} OWNERS file not found")
return {}

@property
def root_reviewers(self) -> List[str]:
_reviewers = self.approvers_and_reviewers.get(".", {}).get("reviewers", [])
_reviewers = self.all_approvers_and_reviewers.get(".", {}).get("reviewers", [])
self.logger.debug(f"{self.log_prefix} ROOT Reviewers: {_reviewers}")
return _reviewers

@property
def root_approvers(self) -> List[str]:
_approvers = self.approvers_and_reviewers.get(".", {}).get("approvers", [])
_approvers = self.all_approvers_and_reviewers.get(".", {}).get("approvers", [])
self.logger.debug(f"{self.log_prefix} ROOT Approvers: {_approvers}")
return _approvers

Expand Down Expand Up @@ -1327,8 +1298,9 @@ def check_if_can_be_merged(self) -> None:
_labels = self.pull_request_labels_names()
self.logger.debug(f"{self.log_prefix} check if can be merged. PR labels are: {_labels}")

if not self.pull_request.mergeable:
failure_output += "PR is not mergeable: {self.pull_request.mergeable_state}\n"
is_pr_mergable = self.pull_request.mergeable
if not is_pr_mergable:
failure_output += f"PR is not mergeable: {is_pr_mergable}\n"

required_check_in_progress_failure_output, check_runs_in_progress = self._required_check_in_progress(
last_commit_check_runs=last_commit_check_runs
Expand Down Expand Up @@ -2027,9 +1999,9 @@ def run_podman_command(self, command: str, pipe: bool = False) -> Tuple[bool, st

return rc, out, err

def get_approvers_and_reviewers(self) -> dict[str, dict[str, list[str]]]:
def get_all_approvers_and_reviewers(self) -> dict[str, dict[str, Any]]:
# Dictionary mapping OWNERS file paths to their approvers and reviewers
_owners: dict[str, dict[str, list[str]]] = {}
_owners: dict[str, dict[str, Any]] = {}

max_owners_files = 1000 # Configurable limit
owners_count = 0
Expand All @@ -2050,7 +2022,6 @@ def get_approvers_and_reviewers(self) -> dict[str, dict[str, list[str]]]:
try:
content = yaml.safe_load(_path.decoded_content)
if self._validate_owners_content(content, content_path):
# Use Path for consistent path handling
parent_path = str(Path(content_path).parent)
if not parent_path:
parent_path = "."
Expand All @@ -2065,44 +2036,47 @@ def get_approvers_and_reviewers(self) -> dict[str, dict[str, list[str]]]:

def get_all_approvers(self) -> list[str]:
_approvers: list[str] = []
for list_of_approvers in self.owners_data_for_changed_files()["approvers"]:
for _approver in list_of_approvers:
for list_of_approvers in self.owners_data_for_changed_files().values():
for _approver in list_of_approvers.get("approvers", []):
_approvers.append(_approver)

reviewers = list(set(self.root_approvers + _approvers))
reviewers.sort()
return reviewers
_approvers.sort()
return _approvers

def get_all_reviewers(self) -> list[str]:
_reviewers: list[str] = []
for list_of_reviewers in self.owners_data_for_changed_files()["reviewers"]:
for _approver in list_of_reviewers:
for list_of_reviewers in self.owners_data_for_changed_files().values():
for _approver in list_of_reviewers.get("reviewers", []):
_reviewers.append(_approver)

approvers = list(set(self.root_reviewers + _reviewers))
approvers.sort()
return approvers
_reviewers.sort()
return _reviewers

def owners_data_for_changed_files(self) -> dict[str, list[list[str]]]:
data: dict[str, list[list[str]]] = {"approvers": [], "reviewers": []}
def owners_data_for_changed_files(self) -> dict[str, dict[str, Any]]:
data: dict[str, dict[str, Any]] = {}

changed_folders = {Path(cf).parent for cf in self.changed_files}

for changed_folder_path in changed_folders:
for owners_dir, owners_data in self.approvers_and_reviewers.items():
_owners_dir = Path(owners_dir)
changed_folder_match: list[str] = []

require_root_approvers: bool = False

if _owners_dir == changed_folder_path or _owners_dir in changed_folder_path.parents:
_reviewers = owners_data.get("reviewers", [])
self.logger.debug(f"{self.log_prefix} Found reviewers for {owners_dir}: {_reviewers}")
data["reviewers"].append(_reviewers)
for owners_dir, owners_data in self.all_approvers_and_reviewers.items():
_owners_dir = Path(owners_dir)

_approvers = owners_data.get("approvers", [])
self.logger.debug(f"{self.log_prefix} Found approvers for {owners_dir}: {_approvers}")
data["approvers"].append(_approvers)
for changed_folder in changed_folders:
if _owners_dir == changed_folder or _owners_dir.parents == changed_folders:
data[owners_dir] = owners_data
Comment on lines +2067 to +2069
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix path comparison logic in owners_data_for_changed_files

There's a critical issue in the path comparison logic. The current implementation incorrectly compares _owners_dir.parents (which is a sequence of paths) with changed_folders (which is a set).

Apply this fix:

-                if _owners_dir == changed_folder or _owners_dir.parents == changed_folders:
+                if _owners_dir == changed_folder or changed_folder in _owners_dir.parents:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for changed_folder in changed_folders:
if _owners_dir == changed_folder or _owners_dir.parents == changed_folders:
data[owners_dir] = owners_data
for changed_folder in changed_folders:
if _owners_dir == changed_folder or changed_folder in _owners_dir.parents:
data[owners_dir] = owners_data

changed_folder_match.append(owners_dir)
if not require_root_approvers:
require_root_approvers = owners_data.get("root-approvers", True)

if [_folder for _folder in changed_folders if str(_folder) not in changed_folder_match]:
data["."] = self.all_approvers_and_reviewers.get(".", {})

if require_root_approvers:
data["."] = self.all_approvers_and_reviewers.get(".", {})

data["reviewers"].sort()
data["approvers"].sort()
return data

def _validate_owners_content(self, content: Any, path: str) -> bool:
Expand Down Expand Up @@ -2201,14 +2175,6 @@ def _check_lables_for_can_be_merged(self, labels: list[str]) -> str:
return failure_output

def _check_if_pr_approved(self, labels: list[str]) -> str:
_pr_approvers: list[str] = []
all_needed_approvers = []
for approvers_list in self.owners_data_for_changed_files()["approvers"]:
if approvers_list not in all_needed_approvers:
all_needed_approvers.append(approvers_list)

# all_needed_approvers is [['approver1', 'approver2'], ['approver3', 'approver4']]
# To mark PR as approved we need at least one lgtm/approved from each nested list inside all_needed_approvers
approved_by = []
for _label in labels:
if APPROVED_BY_LABEL_PREFIX.lower() in _label.lower():
Expand All @@ -2220,15 +2186,20 @@ def _check_if_pr_approved(self, labels: list[str]) -> str:

missing_approvers = self.all_approvers.copy()

for owners_data in self.approvers_and_reviewers.values():
_approvers = owners_data.get("approvers", [])
for approver in _approvers:
if approver in approved_by:
_pr_approvers.append(approver)
for data in self.owners_data_for_changed_files().values():
required_pr_approvers = data.get("approvers", [])
for required_pr_approver in required_pr_approvers:
if required_pr_approver in approved_by:
# Once we found approver in approved_by list, we remove all approvers from missing_approvers list for this owners file
{missing_approvers.remove(_approver) for _approver in _approvers if _approver in missing_approvers} # type: ignore
{
missing_approvers.remove(_approver) # type: ignore
for _approver in required_pr_approvers
if _approver in missing_approvers
}
Comment on lines +2189 to +2198
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor approval validation to avoid side effects in comprehension

The current implementation uses a set comprehension for its side effects, which is an anti-pattern. Comprehensions should be used for creating new collections, not for side effects.

Apply this refactor:

-                    {
-                        missing_approvers.remove(_approver)  # type: ignore
-                        for _approver in required_pr_approvers
-                        if _approver in missing_approvers
-                    }
+                    for _approver in required_pr_approvers:
+                        if _approver in missing_approvers:
+                            missing_approvers.remove(_approver)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for data in self.owners_data_for_changed_files().values():
required_pr_approvers = data.get("approvers", [])
for required_pr_approver in required_pr_approvers:
if required_pr_approver in approved_by:
# Once we found approver in approved_by list, we remove all approvers from missing_approvers list for this owners file
{missing_approvers.remove(_approver) for _approver in _approvers if _approver in missing_approvers} # type: ignore
{
missing_approvers.remove(_approver) # type: ignore
for _approver in required_pr_approvers
if _approver in missing_approvers
}
for data in self.owners_data_for_changed_files().values():
required_pr_approvers = data.get("approvers", [])
for required_pr_approver in required_pr_approvers:
if required_pr_approver in approved_by:
# Once we found approver in approved_by list, we remove all approvers from missing_approvers list for this owners file
for _approver in required_pr_approvers:
if _approver in missing_approvers:
missing_approvers.remove(_approver)

break

missing_approvers = list(set(missing_approvers))

if missing_approvers:
return f"Missing lgtm/approved from approvers: {', '.join(missing_approvers)}\n"

Expand Down
Loading