diff --git a/.gitignore b/.gitignore index d5a9454ec..a56512c06 100644 --- a/.gitignore +++ b/.gitignore @@ -61,4 +61,6 @@ custom_profiles/ /tools/security/* Apps/Windows/inspector.exe Apps/Windows/Element.xml -Framework/settings.conf.lock \ No newline at end of file +Framework/settings.conf.lock +Framework/Built_In_Automation/Desktop/Linux/latest_app.txt +**/linux_screen.png \ No newline at end of file diff --git a/Framework/Built_In_Automation/Desktop/Linux/BuiltInFunctions.py b/Framework/Built_In_Automation/Desktop/Linux/BuiltInFunctions.py new file mode 100644 index 000000000..82386d844 --- /dev/null +++ b/Framework/Built_In_Automation/Desktop/Linux/BuiltInFunctions.py @@ -0,0 +1,1564 @@ +import re +import inspect +import time +import subprocess +import sys +import os +import glob +from typing import List, Literal, Tuple, Optional, Any, Callable + +from Framework.module_installer import install_missing_modules + +try: + import pyatspi + from pyatspi.action import Action + from pyatspi.editabletext import EditableText, Text +except ImportError: + install_missing_modules(["python3-pyatspi==1.19.0", "pygobject==3.50.1"]) + try: + import pyatspi + from pyatspi.action import Action + from pyatspi.editabletext import EditableText, Text + except ImportError: + sys.stderr.write( + "Error: system dependency is not installed. Install them by running Installer/setup_linux_inspector.sh.\n" + ) + sys.exit(1) + +from Framework.Utilities import CommonUtil +from Framework.Built_In_Automation.Shared_Resources import ( + BuiltInFunctionSharedResources as Shared_Resources, +) +from Framework.Utilities.decorators import logger + + +class Collection: ... + + +class Component: ... + + +class Document: ... + + +class Hypertext: ... + + +class Image: ... + + +class Selection: ... + + +class Table: ... + + +class TableCell: ... + + +class Value: ... + + +DataSet = List[Tuple[str, str, str]] + + +def getInterface(iface_func: Callable, obj: Any) -> Any: ... + + +class Accessible: + def __init__(self): ... + + def get_child_at_index(self, index: int) -> "Accessible": ... + def get_attributes_as_array(self) -> List[str]: ... + def get_application(self) -> Optional["Accessible"]: ... + def get_child_count(self) -> int: ... + def get_index_in_parent(self) -> int: ... + def get_localized_role_name(self) -> str: ... + def get_relation_set(self) -> Any: ... + def get_role(self) -> int: ... + def get_role_name(self) -> str: ... + def get_state_set(self) -> Any: ... + def get_description(self) -> Optional[str]: ... + def get_object_locale(self) -> str: ... + def get_name(self) -> Optional[str]: ... + def get_parent(self) -> Optional["Accessible"]: ... + def set_cache_mask(self, mask: int) -> None: ... + def clear_cache(self) -> None: ... + def get_id(self) -> str: ... + def get_toolkit_name(self) -> str: ... + def get_toolkit_version(self) -> str: ... + def get_atspi_version(self) -> str: ... + + def __getitem__(self, index: int) -> "Accessible": ... + def __len__(self) -> int: ... + def __bool__(self) -> bool: ... + def __str__(self) -> str: ... + def isEqual(self, other: "Accessible") -> bool: ... + + # Properties + childCount: int + description: Optional[str] + objectLocale: str + name: Optional[str] + parent: Optional["Accessible"] + id: str + toolkitName: str + toolkitVersion: str + atspiVersion: str + + # Query interfaces + def queryAction(self) -> Action: ... + def queryCollection(self) -> Collection: ... + def queryComponent(self) -> Component: ... + def queryDocument(self) -> Document: ... + def queryEditableText(self) -> EditableText: ... + def queryHyperlink(self) -> Any: ... + def queryHypertext(self) -> Hypertext: ... + def queryImage(self) -> Image: ... + def querySelection(self) -> Selection: ... + def queryTable(self) -> Table: ... + def queryTableCell(self) -> TableCell: ... + def queryText(self) -> Text: ... + def queryValue(self) -> Value: ... + + +MODULE_NAME = inspect.getmodulename(__file__) or "BuiltInFunctions" +ui_xml_strings = [] # needed for generating XML tree +LATEST_APP_FILE = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "latest_app.txt" +) + + +def save_latest_app_name(app_name: str): + """Save the latest used application name to a file.""" + try: + with open(LATEST_APP_FILE, "w") as f: + f.write(app_name) + except Exception as e: + CommonUtil.ExecLog(MODULE_NAME, f"Failed to save latest app name: {e}", 3) + + +def get_latest_app_name() -> str | None: + """Get the latest used application name from the file.""" + try: + if os.path.exists(LATEST_APP_FILE): + with open(LATEST_APP_FILE, "r") as f: + return f.read().strip() + except Exception as e: + CommonUtil.ExecLog(MODULE_NAME, f"Failed to get latest app name: {e}", 3) + return None + + +def _get_window_id_for_app(app_name: str | None) -> str | None: + """Return a window id for the requested app name, or None if not found. + + - If app_name is provided, tries to find the first visible window with that name using xdotool. + - Falls back to the active window using xdotool if no app_name was resolved or found. + """ + try: + if app_name: + # search for visible window by name (use substring regex match) + # use case-insensitive matching in regex + pattern = f"(?i).*{re.escape(app_name)}.*" + res = subprocess.run( + ["xdotool", "search", "--onlyvisible", "--name", pattern], + capture_output=True, + text=True, + ) + win_lines = [l for l in res.stdout.splitlines() if l.strip()] + if win_lines: + return win_lines[0].strip() + # try class match + res = subprocess.run( + ["xdotool", "search", "--onlyvisible", "--class", pattern], + capture_output=True, + text=True, + ) + win_lines = [l for l in res.stdout.splitlines() if l.strip()] + if win_lines: + return win_lines[0].strip() + # try matching by exec command (from desktop file) or by process name (pgrep) + try: + app_key, matched_name, exec_cmd = find_best_app_match(app_name) or ( + None, + None, + None, + ) + except Exception: + app_key, matched_name, exec_cmd = (None, None, None) + + if exec_cmd: + # try to find processes using exec_cmd + for pid in get_process_ids(exec_cmd): + res = subprocess.run( + ["xdotool", "search", "--onlyvisible", "--pid", str(pid)], + capture_output=True, + text=True, + ) + win_lines = [l for l in res.stdout.splitlines() if l.strip()] + if win_lines: + return win_lines[0].strip() + + # try matching by pid for processes that match app_name + for pid in get_process_ids(app_name): + res = subprocess.run( + ["xdotool", "search", "--onlyvisible", "--pid", str(pid)], + capture_output=True, + text=True, + ) + win_lines = [l for l in res.stdout.splitlines() if l.strip()] + if win_lines: + return win_lines[0].strip() + # as a last resort, iterate visible windows and check names for substring match + res = subprocess.run( + ["xdotool", "search", "--onlyvisible", "--name", ".*"], + capture_output=True, + text=True, + ) + win_lines = [l for l in res.stdout.splitlines() if l.strip()] + for wid in win_lines: + try: + name = subprocess.run( + ["xdotool", "getwindowname", wid], + capture_output=True, + text=True, + ).stdout.strip() + if app_name.lower() in name.lower(): + return wid.strip() + except Exception: + continue + # fallback to active window + res = subprocess.run( + ["xdotool", "getactivewindow"], capture_output=True, text=True + ) + winid = res.stdout.strip() + if winid: + CommonUtil.ExecLog( + MODULE_NAME, f"Selected window id {winid} for app '{app_name}'", 1 + ) + CommonUtil.ExecLog( + MODULE_NAME, f"Trying xwd/convert capture for window id: {winid}", 1 + ) + return winid + except Exception as e: + CommonUtil.ExecLog(MODULE_NAME, f"Window lookup error: {e}", 3) + return None + + +def capture_screenshot(file_path: str, app_name: str | None = None) -> bool: + """Capture screenshot of the desired window. + + On macOS: Uses AppleScript + screencapture to capture the frontmost window. + On Linux: Uses xwd (and ImageMagick convert), falling back to scrot/gnome-screenshot/import if necessary. + + The function will try to capture the latest opened application window if available + (via `get_latest_app_name()`); otherwise it will capture the currently active window. + """ + desired_app = app_name or get_latest_app_name() + + # Check if running on macOS + if sys.platform == "darwin": + if not desired_app: + CommonUtil.ExecLog( + MODULE_NAME, "App name is required for macOS screenshot capture", 3 + ) + return False + + try: + # Use AppleScript to get the Window ID of the frontmost window + osascript_command = ( + f"osascript -e 'tell application \"{desired_app}\" to id of window 1'" + ) + result = subprocess.run( + osascript_command, + shell=True, + check=True, + capture_output=True, + text=True, + ) + window_id = result.stdout.strip() + + if not window_id.isdigit(): + CommonUtil.ExecLog( + MODULE_NAME, + f"Could not retrieve a valid Window ID for '{desired_app}'. Is the app running and does it have a window open?", + 3, + ) + return False + + CommonUtil.ExecLog(MODULE_NAME, f"Found Window ID: {window_id}", 1) + + # Use screencapture with the -l (limit) flag to target the Window ID + screencapture_command = ["screencapture", "-l", window_id, file_path] + subprocess.run(screencapture_command, check=True, capture_output=True) + + if os.path.exists(file_path) and os.path.getsize(file_path) > 0: + CommonUtil.ExecLog( + MODULE_NAME, + f"Successfully captured '{desired_app}' window to: {file_path}", + 1, + ) + return True + else: + CommonUtil.ExecLog( + MODULE_NAME, "Screenshot file was not created or is empty", 3 + ) + return False + + except subprocess.CalledProcessError as e: + CommonUtil.ExecLog( + MODULE_NAME, + f"macOS screenshot capture failed: {e.stderr.decode() if e.stderr else str(e)}", + 3, + ) + return False + except FileNotFoundError: + CommonUtil.ExecLog( + MODULE_NAME, + "osascript or screencapture command not found (should be present on macOS)", + 3, + ) + return False + except Exception as e: + CommonUtil.ExecLog(MODULE_NAME, f"macOS screenshot capture failed: {e}", 3) + return False + + # Linux screenshot logic + try: + winid = _get_window_id_for_app(desired_app) + if winid: + # Try to use xwd + convert (ImageMagick) to create the requested file + # If convert is not available, xwd will produce an .xwd output (which may not be desired) + # We'll attempt convert and if it fails fall back to writing xwd file then try to convert + convert_available = ( + subprocess.run( + ["which", "convert"], capture_output=True, text=True + ).returncode + == 0 + ) + if convert_available: + # Run xwd and pipe to convert which will write the final file + p1 = subprocess.Popen( + ["xwd", "-silent", "-id", winid], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ) + p2 = subprocess.Popen( + ["convert", "xwd:-", file_path], + stdin=p1.stdout, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if p1.stdout: + p1.stdout.close() + out, err = p2.communicate() + if ( + p2.returncode == 0 + and os.path.exists(file_path) + and os.path.getsize(file_path) > 0 + ): + return True + else: + # convert not available, write xwd to file and then optionally convert using import + tmp_xwd = ( + file_path if file_path.endswith(".xwd") else f"{file_path}.xwd" + ) + exit_code = subprocess.run( + ["xwd", "-silent", "-id", winid, "-out", tmp_xwd], + capture_output=True, + ).returncode + if ( + exit_code == 0 + and os.path.exists(tmp_xwd) + and os.path.getsize(tmp_xwd) > 0 + ): + # If desired output wasn't .xwd and ImageMagick 'convert' exists, try to convert + if ( + not file_path.endswith(".xwd") + and subprocess.run( + ["which", "convert"], capture_output=True + ).returncode + == 0 + ): + conv_exit = subprocess.run( + ["convert", tmp_xwd, file_path], capture_output=True + ).returncode + if ( + conv_exit == 0 + and os.path.exists(file_path) + and os.path.getsize(file_path) > 0 + ): + # remove the temporary xwd file + try: + os.remove(tmp_xwd) + except Exception: + pass + return True + # If the caller wanted .xwd (or conversion not possible), move or rename temporary file + if tmp_xwd != file_path: + try: + os.replace(tmp_xwd, file_path) + except Exception: + pass + if os.path.exists(file_path) and os.path.getsize(file_path) > 0: + return True + except FileNotFoundError: + # xwd not present + CommonUtil.ExecLog(MODULE_NAME, "xwd command not found", 3) + except Exception as e: + CommonUtil.ExecLog(MODULE_NAME, f"xwd/convert screenshot failed: {e}", 3) + + CommonUtil.ExecLog( + MODULE_NAME, + "Failed to capture screenshot. Ensure xwd, xdotool, and ImageMagick (convert) are installed.", + 3, + ) + return False + + +def convert_data_set_to_dict(data_set: DataSet) -> dict[str, str]: + """Convert data set to dictionary for easier access""" + # ToDo: handle * and ** properly + data_dict = {} + for item in data_set: + if len(item) == 3: + key, _, value = item + if key.startswith("**"): + data_dict[key[2:].strip()] = value + data_dict["exact_" + key[2:].strip()] = "false" + data_dict["case_sensitive_" + key[2:].strip()] = "false" + elif key.startswith("*"): + data_dict[key[1:].strip()] = value + data_dict["exact_" + key[1:].strip()] = "false" + else: + data_dict[key.strip()] = value + else: + CommonUtil.ExecLog(MODULE_NAME, f"Invalid item in data set: {item}", 3) + return data_dict + + +def simulate_keyboard_typing( + app_name: str | None, node: Accessible | None, text: str +) -> bool: + if node: + action_iface = node.queryAction() + if action_iface and action_iface.nActions > 0: + for i in range(action_iface.nActions): + action_name = action_iface.getName(i) + if "activate" in action_name.lower(): + action_iface.doAction(i) + try: + if app_name: + app_window = ( + subprocess.run( + ["xdotool", "search", "--name", app_name], + capture_output=True, + text=True, + ) + .stdout.strip() + .split("\n")[0] + ) + if app_window: + subprocess.run( + ["xdotool", "windowactivate", app_window], capture_output=True + ) + else: + CommonUtil.ExecLog( + MODULE_NAME, f"Application window for '{app_name}' not found.", 3 + ) + return False + except: + pass + + time.sleep(0.2) + subprocess.run(["xdotool", "type", "--delay", "50", text], capture_output=True) + return True + + +def get_attributes(accessible): + attrs = accessible.getAttributes() + attr_str = "" + if attrs: + for attr in attrs: + if ":" in attr: + key, value = attr.split(":", 1) + safe_value = ( + value.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + ) + attr_str += f' {key}="{safe_value}"' + return attr_str + + +def get_extended_info(accessible): + info_str = "" + try: + description = accessible.description + if description: + safe_desc = ( + description.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + ) + info_str += f' description="{safe_desc}"' + except Exception: + pass + try: + state_set = accessible.getStateSet() + states = [pyatspi.stateToString(s) or "" for s in state_set.getStates()] + if states: + info_str += f' states="{",".join(states)}"' + except Exception: + pass + try: + action_iface = accessible.queryAction() + if action_iface and action_iface.nActions > 0: + actions = [action_iface.getName(i) for i in range(action_iface.nActions)] + info_str += f' actions="{",".join(actions)}"' + except Exception: + pass + return info_str + + +def get_position_info(accessible): + """Return position string for XML with coordinates relative to: + - 'desktop' (default): absolute coordinates using DESKTOP_COORDS + - 'app': relative to the application's window origin + - 'parent': relative to the immediate parent's origin + + If computing a relative coordinate fails, falls back to desktop coordinates. + """ + position_str = "" + try: + component_iface = accessible.queryComponent() + if component_iface: + # Get absolute (desktop) position first + x_abs, y_abs = component_iface.getPosition(pyatspi.DESKTOP_COORDS) + width, height = component_iface.getSize() + x, y = x_abs, y_abs + + position_str += f' x="{x}" y="{y}"' + position_str += f' width="{width}" height="{height}"' + except Exception: + pass + + return position_str + + +def dump_node( + node: Accessible, indent_level=0, path=[], recursive=True +) -> list[str] | None: + global ui_xml_strings + if not recursive: + ui_xml_strings = [] + if not node: + return + + indent = " " * indent_level + role = node.get_role_name().replace(" ", "_") + name = node.name or "" + safe_name = ( + name.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + ) + + attributes = get_attributes(node) + get_extended_info(node) + position_info = get_position_info(node) + path_str = ".".join(map(str, path)) + path_attr = f' path="{path_str}"' + + iface_attrs = "" + text_content_attr = "" + + try: + text_iface = node.queryText() + if text_iface: + try: + raw_text = text_iface.getText(0, -1).strip() + raw_text = raw_text.strip("\ufffc") + if raw_text: + safe_text = ( + raw_text.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + ) + text_content_attr = f' text="{safe_text}"' + except Exception: + pass + except NotImplementedError: + pass + + try: + if node.queryEditableText(): + iface_attrs += ' editable_text_iface="true"' + except NotImplementedError: + pass + + child_count = node.childCount + if child_count > 0: + ui_xml_strings.append( + f'{indent}<{role} name="{safe_name}"{attributes}{path_attr}{position_info}{iface_attrs}{text_content_attr}>' + ) + for i in range(child_count): + child = node.get_child_at_index(i) + if recursive: + dump_node(child, indent_level + 1, path + [i], recursive=recursive) + ui_xml_strings.append(f"{indent}") + else: + ui_xml_strings.append( + f'{indent}<{role} name="{safe_name}"{attributes}{path_attr}{position_info}{iface_attrs}{text_content_attr}/>' + ) + if not recursive: + return ui_xml_strings + + +def get_ui_tree(app_keyword) -> str | None: + global ui_xml_strings + desktop = pyatspi.Registry.getDesktop(0) + target_app = None + + for app in desktop: + if app and app_keyword in app.name.lower(): + target_app = app + break + + if target_app: + for i in range(desktop.childCount): + if desktop.getChildAtIndex(i) == target_app: + break + ui_xml_strings = [''] + dump_node(target_app, 0, path=[]) + return "\n".join(ui_xml_strings) + else: + CommonUtil.ExecLog( + MODULE_NAME, f"Error: Application matching '{app_keyword}' not found.", 3 + ) + return None + + +def get_paths_by_text( + xml_content: str, search_text: str, exact_match=True, case_sensitive=True +) -> list[str]: + content_to_search = xml_content + + if not case_sensitive: + search_text = search_text.lower() + content_to_search = content_to_search.lower() + + if exact_match: + pattern = re.compile( + r'text="{}"\s+[^>]*?path="([^"]+)"|path="([^"]+)"[^>]*?text="{}"'.format( + re.escape(search_text), re.escape(search_text), re.escape(search_text) + ) + ) + else: + pattern = re.compile( + r'text="[^"]*{}[^"]*"[^>]*?path="([^"]+)"|path="([^"]+)"[^>]*?text="[^"]*{}[^"]*"'.format( + re.escape(search_text), re.escape(search_text) + ) + ) + + matches = pattern.findall(content_to_search) + paths = [] + for match in matches: + path = match[0] if match[0] else match[1] + if path and path not in paths: + paths.append(path) + + return sorted(paths) + + +def get_parent_path_from_paths(paths: list[str]) -> list[str]: + """ + Sometimes multiple paths are returned for the same element. + They may have parent-child relations. This function identifies + all parent paths by removing children whose parent exists in the list. + + Returns a list of parent paths (paths that are not prefixes of any other path). + """ + if not paths: + return [] + + # Sort by length to process shorter (potential parent) paths first + sorted_paths = sorted(set(paths), key=lambda x: len(x)) + parents = [] + + for i, path in enumerate(sorted_paths): + # Check if this path is a parent of any other path + is_parent = False + for j in range(i + 1, len(sorted_paths)): + # Check if sorted_paths[j] starts with path followed by a dot + # This ensures "0.0.0" doesn't match "0.0.0.1" incorrectly + if sorted_paths[j].startswith(path + "."): + is_parent = True + break + + # If this path is not a parent of any remaining path, it's a leaf or standalone parent + if not is_parent: + parents.append(path) + + return parents + + +def get_path_appname_from_dataset( + data_dict: dict[str, str], + wait_time=Shared_Resources.Get_Shared_Variables("element_wait"), +) -> tuple[str | None, str | None]: + path, app_name = data_dict.get("path"), data_dict.get("app_name") + wait_time = float(data_dict.get("wait", wait_time) or str(wait_time or 10)) + index = data_dict.get("index") or "0" + if index.isdigit(): + index = int(index) + else: + raise ValueError("Index must be an integer.") + text = data_dict.get("text", "").strip() + exact_text_match = data_dict.get("exact_text", "true").lower() == "true" + text_case_sensitive = data_dict.get("case_sensitive_text", "true").lower() == "true" + start_time = time.time() + if not path and text: + while True: + ui_tree = get_ui_tree(app_name) + if not ui_tree: + CommonUtil.ExecLog( + "", "UI tree not found for app_name: %s" % app_name, 3 + ) + return None, app_name + paths = get_paths_by_text( + ui_tree, + text, + exact_match=exact_text_match, + case_sensitive=text_case_sensitive, + ) + + if len(paths) == 0: + if time.time() < start_time + wait_time: + time.sleep(0.5) + continue + else: + CommonUtil.ExecLog("", "No elements found with text: %s" % text, 3) + return None, app_name + if len(paths) == 1: + return paths[0], app_name + else: + parent_paths = get_parent_path_from_paths(paths) + CommonUtil.ExecLog("", "Found paths: %s" % parent_paths, 1) + return parent_paths[index] if parent_paths else None, app_name + return path, app_name + + +@logger +def get_node( + data_dict: dict[str, str], + wait_time=Shared_Resources.Get_Shared_Variables("element_wait"), +) -> Accessible | None: + """Get element using path_string from dataset""" + frame = inspect.currentframe() + sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME + start_time = time.time() + if not data_dict: + CommonUtil.ExecLog(sModuleInfo, "Data set is empty", 3) + return None + try: + path, app_name = get_path_appname_from_dataset(data_dict) + if not path: + CommonUtil.ExecLog(sModuleInfo, "No path found in the dataset", 3) + return None + if not app_name: + CommonUtil.ExecLog(sModuleInfo, "No app_name found in the dataset", 3) + return None + path = path.strip().replace(" ", ".") # support for space separated paths + + desktop = pyatspi.Registry.getDesktop(0) + target_app = None + for app in desktop: + if app and app.name and app_name in app.name.lower(): + target_app = app + break + if not target_app: + CommonUtil.ExecLog( + sModuleInfo, "No application found with name: %s" % app_name, 3 + ) + return None + try: + indices = [int(i) for i in path.strip().split(".")] + except ValueError: + CommonUtil.ExecLog(sModuleInfo, "Invalid path string: %s" % path, 3) + return None + + node = target_app + for i, index in enumerate(indices): + if index >= len(node): + current_path = ".".join(map(str, indices[:i])) + CommonUtil.ExecLog( + sModuleInfo, + "Index %d out of bounds at %s" % (index, current_path), + 3, + ) + return None + node = node[index] + + if not node: + CommonUtil.ExecLog(sModuleInfo, "No element found at the specified path", 3) + return None + return node + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, f"Error while getting node: {e}", 3) + return None + + +def click_element_by_node(node: Accessible | None) -> Literal["passed", "zeuz_failed"]: + """Click using node, first get the element then click""" + frame = inspect.currentframe() + sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME + + if node is None: + CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) + return "zeuz_failed" + + original_node = node + # Use module-level helper get_node_center_coords + + # Use module-level helper click_coords_with_xdotool + + while node: + try: + action_iface = node.queryAction() + if action_iface and action_iface.nActions > 0: + click_action_index: int = -1 + for i in range(action_iface.nActions): + action_name: str = action_iface.getName(i) + if action_name in [ + "click", + "jump", + "press", + "open", + "activate", + "select", + "clickAncestor", + "link.open", + ]: + click_action_index = i + break + + if click_action_index >= 0: + action_name: str = action_iface.getName(click_action_index) + action_iface.doAction(click_action_index) + CommonUtil.ExecLog( + sModuleInfo, f"Clicked element using action: {action_name}", 1 + ) + return "passed" + else: + # No action found on this node: consider clicking via xdotool using node coordinates + # Attempt to compute center coords for the current node + coords = get_node_center_coords(node) + if coords: + # Attempt to get the application name if available + app_name = None + try: + app_acc = node.get_application() + if app_acc and getattr(app_acc, "name", None): + app_name = app_acc.name + except Exception: + app_name = None + if click_coords_with_xdotool(coords, app_name=app_name): + CommonUtil.ExecLog( + sModuleInfo, + f"Clicked element using xdotool at: {coords}", + 1, + ) + return "passed" + else: + CommonUtil.ExecLog( + sModuleInfo, + f"xdotool could not activate the application '{app_name}', aborting click", + 3, + ) + return "zeuz_failed" + else: + node = node.parent + continue + except NotImplementedError: + node = node.parent + continue + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, f"Failed to click element: {e}", 3) + # try a final attempt using xdotool on the original node + coords = get_node_center_coords(original_node) + if coords: + app_name = None + try: + app_acc = original_node.get_application() + if app_acc and getattr(app_acc, "name", None): + app_name = app_acc.name + except Exception: + app_name = None + if click_coords_with_xdotool(coords, app_name=app_name): + CommonUtil.ExecLog( + sModuleInfo, f"Clicked element using xdotool at: {coords}", 1 + ) + return "passed" + return "zeuz_failed" + + +def get_node_center_coords(node: Accessible) -> tuple[int, int] | None: + """Module-level helper to compute center coordinates of a node in desktop coords.""" + try: + comp = node.queryComponent() + if comp: + pos_func = getattr(comp, "getPosition", None) + size_func = getattr(comp, "getSize", None) + if pos_func and size_func: + x, y = pos_func(pyatspi.DESKTOP_COORDS) + w, h = size_func() + cx = int(x + (w / 2)) + cy = int(y + (h / 2)) + return cx, cy + except Exception: + return None + return None + + +def click_coords_with_xdotool( + coords: tuple[int, int], app_name: str | None = None +) -> bool: + """Module-level helper to click coordinates via xdotool and optionally activate the app window.""" + try: + x, y = coords + if app_name: + try: + winid = _get_window_id_for_app(app_name) + # If we couldn't find the desired window id for the app, do not click + if not winid: + CommonUtil.ExecLog( + MODULE_NAME, f"Could not find a window for app '{app_name}'", 3 + ) + return False + # Try a few methods to activate/raise the window so it's on top + activated = False + try: + # Prefer --sync if available + subprocess.run( + ["xdotool", "windowactivate", "--sync", winid], + capture_output=True, + ) + activated = True + except Exception: + try: + subprocess.run( + ["xdotool", "windowactivate", winid], capture_output=True + ) + activated = True + except Exception: + activated = False + + try: + subprocess.run( + ["xdotool", "windowraise", winid], capture_output=True + ) + except Exception: + # Not critical + pass + + # If wmctrl is available, try using it to activate the window (more reliable on some WMs) + try: + if ( + subprocess.run( + ["which", "wmctrl"], capture_output=True, text=True + ).returncode + == 0 + ): + subprocess.run( + ["wmctrl", "-i", "-a", winid], capture_output=True + ) + activated = True + except Exception: + pass + + # Verify that the requested window is now active; retry a few times + for _ in range(5): + try: + active = subprocess.run( + ["xdotool", "getactivewindow"], + capture_output=True, + text=True, + ).stdout.strip() + if active and active == winid: + activated = True + break + except Exception: + pass + time.sleep(0.1) + if not activated: + CommonUtil.ExecLog( + MODULE_NAME, + f"Failed to activate/raise window {winid} for app '{app_name}'", + 3, + ) + return False + except Exception: + pass + subprocess.run( + ["xdotool", "mousemove", "--sync", str(x), str(y)], + check=True, + capture_output=True, + ) + time.sleep(0.05) + subprocess.run(["xdotool", "click", "1"], check=True, capture_output=True) + return True + except Exception as e: + CommonUtil.ExecLog(MODULE_NAME, f"xdotool click failed: {e}", 3) + return False + + +@logger +def click_element_xdotool(data_set: DataSet) -> Literal["passed", "zeuz_failed"]: + """Click an element using xdotool by computing coordinates from the node in the dataset.""" + frame = inspect.currentframe() + sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME + data_dict = convert_data_set_to_dict(data_set) + node = get_node(data_dict) + if node is None: + CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) + return "zeuz_failed" + + coords = get_node_center_coords(node) + if not coords: + CommonUtil.ExecLog(sModuleInfo, "Could not determine coordinates for node", 3) + return "zeuz_failed" + + app_name = None + try: + app_acc = node.get_application() + if app_acc and getattr(app_acc, "name", None): + app_name = app_acc.name + except Exception: + app_name = None + + # Require app_name to bring it to front before clicking; if we can't determine it, fail + if not app_name: + CommonUtil.ExecLog( + sModuleInfo, "No application context found for xdotool click; aborting", 3 + ) + return "zeuz_failed" + if click_coords_with_xdotool(coords, app_name=app_name): + CommonUtil.ExecLog( + sModuleInfo, f"Clicked element using xdotool at: {coords}", 1 + ) + return "passed" + else: + return "zeuz_failed" + + +@logger +def click_element(data_set: DataSet) -> Literal["passed", "zeuz_failed"]: + """Click using element, first get the element then click""" + frame = inspect.currentframe() + sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME + + data_dict = convert_data_set_to_dict(data_set) + # Check for explicit xdotool method in the dataset + use_xdotool = False + for left, mid, right in data_set: + try: + if ( + mid.strip().lower() == "action" + and left.strip().lower() in ("click method", "method", "click using") + and right.strip().lower() == "xdotool" + ): + use_xdotool = True + except Exception: + continue + node = get_node(data_dict) + if node is None: + CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) + return "zeuz_failed" + + try: + if use_xdotool: + return click_element_xdotool(data_set) + return click_element_by_node(node) + except NotImplementedError: + CommonUtil.ExecLog( + sModuleInfo, "This node does not support the Action interface.", 3 + ) + return "zeuz_failed" + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, f"Failed to click element: {e}", 3) + return "zeuz_failed" + + +def enter_text_in_node( + app_name: str, node: Accessible | None, text: str +) -> Literal["passed", "zeuz_failed"]: + """Enter text using node, first get the element then enter text""" + frame = inspect.currentframe() + sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME + + if node is None: + CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) + return "zeuz_failed" + + while node: + try: + editable_iface = node.queryEditableText() + if editable_iface: + editable_iface.setTextContents(text) + CommonUtil.ExecLog(sModuleInfo, f"Entering text: {text}", 1) + return "passed" + elif simulate_keyboard_typing(app_name, node, text): + return "passed" + else: + node = node.parent + continue + except NotImplementedError: + if simulate_keyboard_typing(app_name, node, text): + return "passed" + node = node.parent + continue + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, f"Failed enter text: {e}", 3) + return "zeuz_failed" + + +@logger +def enter_text(data_set: DataSet) -> Literal["passed", "zeuz_failed"]: + """Enter text using element, first get the element then enter text""" + frame = inspect.currentframe() + sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME + + data_dict = convert_data_set_to_dict(data_set) + app_name = data_dict.get("app_name", "").strip() + text = data_dict.get("text", "").strip() + if not text: + CommonUtil.ExecLog(sModuleInfo, "No text provided to enter", 3) + return "zeuz_failed" + if not app_name: + CommonUtil.ExecLog(sModuleInfo, "No app_name provided to enter text", 3) + return "zeuz_failed" + node = get_node(data_dict) + if node is None: + CommonUtil.ExecLog(sModuleInfo, "Element not found", 3) + return "zeuz_failed" + + try: + return enter_text_in_node(app_name, node, text) + except NotImplementedError: + CommonUtil.ExecLog( + sModuleInfo, "This node does not support the Action interface.", 3 + ) + return "zeuz_failed" + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, f"Failed to enter text: {e}", 3) + return "zeuz_failed" + + +def parse_desktop_file(desktop_file: str) -> Tuple[Optional[str], Optional[str]]: + """Parse a desktop file to extract Name and Exec command.""" + name = None + exec_cmd = None + + try: + with open(desktop_file, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line.startswith("Name=") and not line.startswith("Name["): + name = line[5:] + elif line.startswith("Exec="): + exec_cmd = line[5:] + # Remove common exec field codes (%f, %F, %u, %U, etc.) + exec_cmd = ( + exec_cmd.replace("%f", "") + .replace("%F", "") + .replace("%u", "") + .replace("%U", "") + ) + exec_cmd = ( + exec_cmd.replace("%d", "") + .replace("%D", "") + .replace("%n", "") + .replace("%N", "") + ) + exec_cmd = ( + exec_cmd.replace("%i", "") + .replace("%c", "") + .replace("%k", "") + .replace("%v", "") + ) + exec_cmd = exec_cmd.strip() + + # Stop if we found both + if name and exec_cmd: + break + except Exception: + pass + + return name, exec_cmd + + +def find_best_app_match(user_input: str) -> Optional[Tuple[str, str, str]]: + """Find the best matching application and return (key, name, exec_cmd).""" + apps = {} + + try: + desktop_files = glob.glob("/usr/share/applications/*.desktop") + for desktop_file in desktop_files: + name, exec_cmd = parse_desktop_file(desktop_file) + if name and exec_cmd: + # Use the desktop file basename as the key for matching + key = os.path.basename(desktop_file).replace(".desktop", "") + apps[key] = (name, exec_cmd) + except Exception: + pass + + user_lower = user_input.lower() + + for key, (name, exec_cmd) in apps.items(): + if key.lower() == user_lower: + return key, name, exec_cmd + + for key, (name, exec_cmd) in apps.items(): + if name.lower() == user_lower: + return key, name, exec_cmd + + return None + + +@logger +def open_app(data_set: DataSet) -> Literal["passed", "zeuz_failed"]: + """Open application using element, first get the element then open app""" + frame = inspect.currentframe() + sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME + + data_dict = convert_data_set_to_dict(data_set) + app_name = data_dict.get("app_name", "").strip() + + _, matched_app, exec_cmd = find_best_app_match(app_name) or (None, None, None) + + if matched_app: + if matched_app != app_name: + CommonUtil.ExecLog( + MODULE_NAME, f"Best match found: {matched_app} for {app_name}", 1 + ) + try: + # if args: + # command = f"nohup {app_name} {' '.join(args)} >/dev/null 2>&1 &" + # else: + command = f"nohup {exec_cmd} >/dev/null 2>&1 &" + exit_code = os.system(command) + if exit_code == 0: + CommonUtil.ExecLog( + sModuleInfo, + f"Successfully launched '{app_name}' with command: {command}", + 1, + ) + save_latest_app_name(app_name) + return "passed" + else: + CommonUtil.ExecLog( + sModuleInfo, + f"Failed to launch '{app_name}' (exit code: {exit_code})", + 3, + ) + return "zeuz_failed" + + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, f"Error launching '{app_name}': {e}", 3) + return "zeuz_failed" + else: + try: + command = f"nohup {app_name} >/dev/null 2>&1 &" + exit_code = os.system(command) + if exit_code == 0: + CommonUtil.ExecLog( + sModuleInfo, + f"Successfully launched '{app_name}' with command: {command}", + 1, + ) + save_latest_app_name(app_name) + return "passed" + else: + CommonUtil.ExecLog( + sModuleInfo, + f"Failed to launch '{app_name}' (exit code: {exit_code})", + 3, + ) + return "zeuz_failed" + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, f"Error launching '{app_name}': {e}", 3) + return "zeuz_failed" + + +def get_process_ids(app_name: str) -> List[str]: + """Get process ID of the application by name""" + try: + process_ids = ( + subprocess.run(["pgrep", "-f", app_name], capture_output=True, text=True) + .stdout.strip() + .splitlines() + ) + return [pid for pid in process_ids if pid.isdigit()] + except Exception as e: + CommonUtil.ExecLog( + MODULE_NAME, f"Error getting process ID for '{app_name}': {e}", 3 + ) + return [] + + +@logger +def close_app(data_set: DataSet) -> Literal["passed", "zeuz_failed"]: + """Close application using element, first get the element then close app""" + frame = inspect.currentframe() + sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME + + data_dict = convert_data_set_to_dict(data_set) + app_name = data_dict.get("app_name", "").strip() + + app_key, matched_app, exec_cmd = find_best_app_match(app_name) or (None, None, None) + if app_key: + app_key = app_key.split(".")[-1] + + if matched_app: + if matched_app != app_name: + CommonUtil.ExecLog( + MODULE_NAME, f"Best match found: {matched_app} for {app_name}", 1 + ) + try: + # get the process ID of the application + process_ids = get_process_ids(matched_app) + if not process_ids and exec_cmd: + process_ids = get_process_ids(exec_cmd) + if not process_ids and app_key: + process_ids = get_process_ids(app_key) + if not process_ids: + CommonUtil.ExecLog( + sModuleInfo, + f"No running process found for Name: '{app_name}', Key: '{app_key}', Command: '{exec_cmd}'", + 3, + ) + return "zeuz_failed" + + # kill the process + for pid in process_ids: + command = f"kill -9 {pid}" + CommonUtil.ExecLog( + sModuleInfo, + f"Closing application '{matched_app}' with command: {command}", + 1, + ) + exit_code = os.system(command) + if exit_code != 0: + CommonUtil.ExecLog( + sModuleInfo, + f"Failed to close application '{matched_app}' with PID {pid} (exit code: {exit_code})", + 3, + ) + return "zeuz_failed" + if process_ids: + CommonUtil.ExecLog( + sModuleInfo, + f"Successfully closed application '{matched_app}' with PIDs: {', '.join(process_ids)}", + 1, + ) + return "passed" + else: + CommonUtil.ExecLog( + sModuleInfo, f"No running process found for '{matched_app}'", 3 + ) + return "zeuz_failed" + + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, f"Error launching '{app_name}': {e}", 3) + return "zeuz_failed" + else: + CommonUtil.ExecLog( + MODULE_NAME, f"No matching application found for '{app_name}'", 3 + ) + return "zeuz_failed" + + +@logger +def wait_for_element(data_set: DataSet) -> Literal["passed", "zeuz_failed"]: + frame = inspect.currentframe() + sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME + data_dict = convert_data_set_to_dict(data_set) + try: + timeout_duration = 10 + appear_condition = True + for left, mid, right in data_set: + if mid.strip().lower() == "action": + if left.strip().lower() == "wait to disappear": + appear_condition = False + timeout_duration = int(right.strip()) + + end_time = time.time() + timeout_duration + while time.time() <= end_time: + node = get_node(data_dict, 0) + if appear_condition and node: # Element found + CommonUtil.ExecLog(sModuleInfo, "Found element", 1) + return "passed" + elif not appear_condition and not node: # Element removed + CommonUtil.ExecLog(sModuleInfo, "Element disappeared", 1) + return "passed" + time.sleep(1) + + CommonUtil.ExecLog(sModuleInfo, "Wait for element failed", 3) + return "zeuz_failed" + + except Exception: + CommonUtil.ExecLog(sModuleInfo, "Error while waiting for element", 3) + return "zeuz_failed" + + +def get_attribute_value(tag_str: str, attr_name: str) -> str | None: + pattern = rf'{attr_name}="(.*?)"' + match = re.search(pattern, tag_str) + return match.group(1) if match else None + + +@logger +def save_attribute(data_set: DataSet) -> Literal["passed", "zeuz_failed"]: + frame = inspect.currentframe() + sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME + + data_dict = convert_data_set_to_dict(data_set) + try: + variable_name = "" + field = "value" + for left, mid, right in data_set: + if mid.strip().lower() == "save parameter": + field = left.replace(" ", "").lower() + variable_name = right.strip() + + node = get_node(data_dict) + if node is None: + return "zeuz_failed" + tag_str = (dump_node(node, recursive=False) or ["", ""])[0] + actual_text = get_attribute_value(tag_str, field) + + if actual_text is None: + CommonUtil.ExecLog( + sModuleInfo, f"Attribute '{field}' not found in the element", 3 + ) + return "zeuz_failed" + + Shared_Resources.Set_Shared_Variables(variable_name, actual_text) + CommonUtil.ExecLog( + sModuleInfo, + f"Text '{actual_text}' is saved in the variable '{variable_name}'", + 1, + ) + return "passed" + except Exception: + CommonUtil.ExecLog(sModuleInfo, "Error while saving attribute", 3) + return "zeuz_failed" + + +def is_valid_hotkey(hotkey: str) -> bool: + """ + Validates hotkey: all lowercase, only modifiers + known keys. + """ + MODIFIERS = {"ctrl", "alt", "shift", "super", "meta"} + KEYS = { + "return", + "enter", + "tab", + "escape", + "esc", + "backspace", + "delete", + "up", + "down", + "left", + "right", + "home", + "end", + "page_up", + "page_down", + "insert", + "space", + *(f"f{i}" for i in range(1, 13)), + } + parts = hotkey.split("+") + if not parts: + return False + + *mods, key = parts + if any(mod not in MODIFIERS for mod in mods): + return False + + return key in KEYS or (len(key) == 1 and key.isalnum()) + + +def send_hotkey(hotkey: str) -> bool: + """ + Sends a lowercase hotkey (e.g., 'ctrl+a', 'alt+f4') using xdotool. + """ + hotkey = hotkey.lower() + + if not is_valid_hotkey(hotkey): + CommonUtil.ExecLog( + MODULE_NAME, f"Invalid hotkey: {hotkey}. Space is not allowed", 3 + ) + return False + + try: + subprocess.run(["xdotool", "key", hotkey], check=True, capture_output=True) + return True + except subprocess.CalledProcessError as e: + CommonUtil.ExecLog( + MODULE_NAME, f"Error sending hotkey '{hotkey}': {e.stderr.decode()}", 3 + ) + return False + + +@logger +def send_keystroke(data_set: DataSet) -> Literal["passed", "zeuz_failed"]: + """Insert characters - mainly key combinations and single key presses.""" + frame = inspect.currentframe() + sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME + + keystroke_value = "" + keystroke_char = "" + for left, mid, right in data_set: + left = left.strip().lower() + if "action" in mid.lower(): + if left == "keystroke keys": + keystroke_value = right.strip().lower() + elif left == "keystroke chars": + keystroke_char = right.strip().lower() + + if keystroke_value == "" and keystroke_char == "": + CommonUtil.ExecLog(sModuleInfo, "Invalid action found", 3) + return "zeuz_failed" + + if keystroke_value != "" and keystroke_char != "": + CommonUtil.ExecLog( + sModuleInfo, + "Both keystroke keys and characters are provided, only one is supported", + 3, + ) + return "zeuz_failed" + + success = True + if keystroke_char != "": + success = simulate_keyboard_typing(None, None, keystroke_char) + else: + key_combinations = keystroke_value.split(",") + for hotkey in key_combinations: + success_tem = send_hotkey(hotkey.strip()) + if not success: + CommonUtil.ExecLog( + sModuleInfo, f"Failed to send hotkey: {hotkey.strip()}", 3 + ) + success = success and success_tem + if success: + return "passed" + else: + CommonUtil.ExecLog(sModuleInfo, "Failed to send some or all keystroke", 3) + return "zeuz_failed" diff --git a/Framework/Built_In_Automation/Sequential_Actions/action_declarations/info.py b/Framework/Built_In_Automation/Sequential_Actions/action_declarations/info.py index 2ac845dc1..8beefa530 100644 --- a/Framework/Built_In_Automation/Sequential_Actions/action_declarations/info.py +++ b/Framework/Built_In_Automation/Sequential_Actions/action_declarations/info.py @@ -7,6 +7,7 @@ selenium, utility, windows, + linux, xml, database, performance, @@ -21,6 +22,7 @@ selenium, utility, windows, + linux, xml, database, performance, diff --git a/Framework/Built_In_Automation/Sequential_Actions/action_declarations/linux.py b/Framework/Built_In_Automation/Sequential_Actions/action_declarations/linux.py new file mode 100644 index 000000000..d0445b9a2 --- /dev/null +++ b/Framework/Built_In_Automation/Sequential_Actions/action_declarations/linux.py @@ -0,0 +1,16 @@ +declarations = ( + { "name": "open app", "function": "open_app", "screenshot": "desktop" }, + { "name": "close app", "function": "close_app", "screenshot": "desktop" }, + { "name": "click", "function": "click_element", "screenshot": "desktop" }, + { "name": "text", "function": "enter_text", "screenshot": "desktop" }, + { "name": "wait to appear", "function": "wait_for_element", "screenshot": "desktop" }, + { "name": "wait to disappear","function": "wait_for_element", "screenshot": "desktop" }, + { "name": "save attribute", "function": "save_attribute", "screenshot": "desktop" }, + { "name": "keystroke keys", "function": "send_keystroke", "screenshot": "desktop" }, + { "name": "keystroke chars", "function": "send_keystroke", "screenshot": "desktop" }, +) + +module_name = "linux" + +for dec in declarations: + dec["module"] = module_name diff --git a/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py b/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py index c4991d745..7a6fcc3cb 100755 --- a/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py +++ b/Framework/Built_In_Automation/Sequential_Actions/sequential_actions.py @@ -142,6 +142,11 @@ def load_sa_modules( from Framework.Built_In_Automation.Desktop.Windows import ( BuiltInFunctions as windows, ) + elif module == "linux": + global linux + from Framework.Built_In_Automation.Desktop.Linux import ( + BuiltInFunctions as linux, + ) elif module == "performance": global performance from Framework.Built_In_Automation.Performance_Testing import ( diff --git a/Framework/nodejs_appium_installer.py b/Framework/nodejs_appium_installer.py index 9c30d39f0..0cd43beb9 100644 --- a/Framework/nodejs_appium_installer.py +++ b/Framework/nodejs_appium_installer.py @@ -4,6 +4,7 @@ import subprocess import tarfile import zipfile +import shutil from pathlib import Path import json import requests @@ -99,7 +100,7 @@ def install_nodejs(): print("Downloading Node.js...") response = requests.get(url, verify=False) response.raise_for_status() - with open(archive_path, 'wb') as out_file: + with open(archive_path, "wb") as out_file: out_file.write(response.content) try: @@ -196,7 +197,9 @@ def check_appium_drivers(): text=True, ) drivers_data = json.loads(result.stdout) - return [name for name, info in drivers_data.items() if info.get("installed", False)] + return [ + name for name, info in drivers_data.items() if info.get("installed", False) + ] except: # noqa: E722 return [] @@ -213,7 +216,9 @@ def check_installations(): if npm_path.exists(): try: result = subprocess.run( - [str(npm_path), "list", "-g", "--json", "appium"], capture_output=True, text=True + [str(npm_path), "list", "-g", "--json", "appium"], + capture_output=True, + text=True, ) npm_data = json.loads(result.stdout) appium_installed = "appium" in npm_data.get("dependencies", {}) @@ -234,9 +239,73 @@ def install_missing_drivers(missing_drivers): install_drivers(missing_drivers) +def check_and_remove_global_appium(): + """Check for and remove existing global Appium installations not managed by us.""" + print("Checking for conflicting global Appium installations...") + + # Method 1: Check using 'which appium' + appium_bin = shutil.which("appium") + if appium_bin: + appium_path = Path(appium_bin).resolve() + node_dir = get_node_dir().resolve() + + try: + # Check if appium is within our node directory + appium_path.relative_to(node_dir) + # If it is, we are good + except ValueError: + print(f"Found conflicting Appium at {appium_path}") + print("Uninstalling old Appium version...") + try: + is_windows = platform.system() == "Windows" + subprocess.run( + ["npm", "uninstall", "-g", "appium"], check=True, shell=is_windows + ) + print("Successfully uninstalled conflicting Appium") + except Exception as e: + print(f"Warning: Failed to uninstall conflicting Appium: {e}") + return + + # Method 2: Check using 'npm list -g appium' (if npm is available in system path) + # This catches cases where appium is installed but not in PATH + npm_bin = shutil.which("npm") + if npm_bin: + try: + # Check if this npm is ours + npm_path = Path(npm_bin).resolve() + node_dir = get_node_dir().resolve() + try: + npm_path.relative_to(node_dir) + # If it is our npm, skip this check as we handle our own appium + return + except ValueError: + pass + + # Check for global appium using system npm + is_windows = platform.system() == "Windows" + result = subprocess.run( + ["npm", "list", "-g", "--json", "appium"], + capture_output=True, + text=True, + shell=is_windows, + ) + npm_data = json.loads(result.stdout) + if "appium" in npm_data.get("dependencies", {}): + print("Found conflicting Appium in global npm modules") + print("Uninstalling old Appium version...") + subprocess.run( + ["npm", "uninstall", "-g", "appium"], check=True, shell=is_windows + ) + print("Successfully uninstalled conflicting Appium") + except Exception as e: + # Don't fail if npm check fails, just log warning + print(f"Warning: Failed to check/uninstall global Appium via npm: {e}") + + def setup_nodejs_appium(): """Main setup function.""" try: + check_and_remove_global_appium() update_path() # Ensure Node.js is in PATH from the start print("Checking Node.js and Appium installation...") diff --git a/Installer/setup_linux_inspector.sh b/Installer/setup_linux_inspector.sh new file mode 100644 index 000000000..578bc156f --- /dev/null +++ b/Installer/setup_linux_inspector.sh @@ -0,0 +1,94 @@ +#!/usr/bin/env bash + +set -e + +# List of required packages for each package manager +APT_PACKAGES=( + build-essential + cmake + pkg-config + libgirepository1.0-dev + libcairo2-dev + xdotool + x11-apps # provides xwd + imagemagick # provides convert, import + wmctrl +) + +DNF_PACKAGES=( + cmake + pkgconf-pkg-config + gobject-introspection-devel + cairo-devel + xdotool + xorg-x11-utils # provides xwd on some distros + ImageMagick + wmctrl + python3-devel + cairo-gobject-devel +) + +PACMAN_PACKAGES=( + gcc + meson + cmake + pkgconf + cairo + xdotool + gobject-introspection + imagemagick + wmctrl +) + +BREW_PACKAGES=( + cmake + pkg-config + cairo + xdotool + gobject-introspection + imagemagick +) + +# Function to join array into space-separated string +join_packages() { + local IFS=" " + echo "$*" +} + +# Detect and install using the available package manager +if command -v apt >/dev/null 2>&1; then + echo "Using APT (Debian/Ubuntu-based)" + sudo apt update + sudo apt install -y $(join_packages "${APT_PACKAGES[@]}") + +elif command -v dnf >/dev/null 2>&1; then + echo "Using DNF (Fedora-based)" + sudo dnf install -y $(join_packages "${DNF_PACKAGES[@]}") + +elif command -v pacman >/dev/null 2>&1; then + echo "Using Pacman (Arch-based)" + sudo pacman -Sy --noconfirm $(join_packages "${PACMAN_PACKAGES[@]}") + +elif command -v brew >/dev/null 2>&1; then + echo "Using Homebrew (macOS)" + brew install $(join_packages "${BREW_PACKAGES[@]}") + +else + echo "❌ No supported package manager found (apt, dnf, pacman, brew)" + exit 1 +fi + +echo "✅ Installation complete." + +# Post-install sanity checks for screenshot/capture utilities +echo "\nChecking availability of key utilities:" +REQUIRED_TOOLS=(xdotool xwd convert import wmctrl) +for t in "${REQUIRED_TOOLS[@]}"; do + if command -v "$t" >/dev/null 2>&1; then + echo " - $t: available" + else + echo " - $t: NOT FOUND" + fi +done + +echo "If any of the above are missing, please install them for full Linux inspector functionality." diff --git a/node_cli.py b/node_cli.py index 64201eb80..9e9bc6f75 100755 --- a/node_cli.py +++ b/node_cli.py @@ -3,6 +3,7 @@ import socket import sys import shutil +import subprocess from pathlib import Path from typing import Any from urllib.parse import urlparse @@ -38,6 +39,7 @@ from Framework.install_handler.long_poll_handler import InstallHandler from server.mobile import upload_android_ui_dump + def adjust_python_path(): """Adjusts the Python path to include the Framework directory.""" root_dir = Path(__file__).parent @@ -50,9 +52,9 @@ def adjust_python_path(): os.chdir(framework_dir) - from Framework.module_installer import ( # noqa: E402 check_min_python_version, + install_missing_modules, update_outdated_modules, # install_missing_modules, ) @@ -68,10 +70,10 @@ def adjust_python_path(): async def start_server(): - def is_port_in_use(port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(("127.0.0.1", port)) == 0 + try: node_server_port = 18100 tries = 0 @@ -80,7 +82,7 @@ def is_port_in_use(port): tries += 1 ConfigModule.add_config_value("server", "port", str(node_server_port)) print(f"Launching node-server on port {node_server_port}") - + app = node_server.main() config = uvicorn.Config( app, @@ -95,6 +97,7 @@ def is_port_in_use(port): traceback.print_exc() print(f"[WARN] Failed to launch node-server: {str(e)}") + def kill_old_process(pid_file_path: os.PathLike): """kill any process that is running from the same node folder.""" import psutil @@ -122,6 +125,7 @@ def setup_nodejs_appium(): """Setup Node.js and Appium if not already installed.""" try: import nodejs_appium_installer + nodejs_appium_installer.setup_nodejs_appium() except Exception as e: print(f"Warning: Failed to setup Node.js and Appium: {e}") @@ -146,6 +150,7 @@ def setup_nodejs_appium(): TEAM_TAG = "team" device_dict: dict[str, Any] = {} + def kill_child_processes(): try: parent = psutil.Process() @@ -167,9 +172,6 @@ def signal_handler(sig, frame): os._exit(0) - - - async def destroy_session(): """ Destroy session file. @@ -472,11 +474,11 @@ async def cancel_callback(): cancel_callback=cancel_callback, done_callback=done_callback, ) - + deploy_task = asyncio.create_task(deploy_handler.run(deploy_srv_addr())) - + await asyncio.gather(install_task, deploy_task, return_exceptions=True) - + return False except Exception: @@ -618,34 +620,37 @@ def get_folder_creation_time(folder_path): def generate_rsa_key(): """Generate a new RSA private key and save it to the rsa_private_keys folder.""" console = Console() - - from Framework.Utilities.RSAKeyUtil import save_private_key as save_key_util, get_public_key_pem + + from Framework.Utilities.RSAKeyUtil import ( + save_private_key as save_key_util, + get_public_key_pem, + ) key_folder = ZEUZ_NODE_PRIVATE_RSA_KEYS_DIR - + private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, ) - + timestamp = dt.now().strftime("%Y%m%d_%H%M%S") key_filename = f"private_key_{timestamp}.pem" - + success, _, saved_path = save_key_util( private_key=private_key, key_folder=key_folder, filename=key_filename, format_type="pkcs8", - check_duplicate=False # New keys shouldn't have duplicates + check_duplicate=False, # New keys shouldn't have duplicates ) - + if not success: console.print(f"[red]Error:[/red] Failed to save generated private key.") return - + # Generate public key public_key_pem = get_public_key_pem(private_key) - + console.print(f"\n[green]✓[/green] RSA private key generated successfully!") console.print(f"[cyan]Location:[/cyan] {saved_path}") console.print(f"\n[cyan]Public Key:[/cyan]") @@ -656,45 +661,52 @@ def add_existing_rsa_key(key_content: str): """Copy an existing RSA private key to the rsa_private_keys folder.""" console = Console() - from Framework.Utilities.RSAKeyUtil import load_private_key_from_pem, save_private_key as save_key_util, get_public_key_pem, check_duplicate_key + from Framework.Utilities.RSAKeyUtil import ( + load_private_key_from_pem, + save_private_key as save_key_util, + get_public_key_pem, + check_duplicate_key, + ) key_folder = ZEUZ_NODE_PRIVATE_RSA_KEYS_DIR - + private_key = load_private_key_from_pem(key_content) if private_key is None: console.print(f"[red]Error:[/red] Invalid PEM private key.") return False - + # Check for duplicates duplicate = check_duplicate_key(private_key, key_folder) if duplicate: - console.print(f"[yellow]Warning:[/yellow] This private key already exists as {duplicate}. Not adding duplicate.") + console.print( + f"[yellow]Warning:[/yellow] This private key already exists as {duplicate}. Not adding duplicate." + ) return False - + # Copy the key with a timestamp timestamp = dt.now().strftime("%Y%m%d_%H%M%S") new_filename = f"imported_key_{timestamp}.pem" - + success, _, saved_path = save_key_util( private_key=private_key, key_folder=key_folder, filename=new_filename, format_type="pkcs8", - check_duplicate=False # Already checked above + check_duplicate=False, # Already checked above ) - + if not success: console.print(f"[red]Error:[/red] Failed to save private key.") return False - + console.print(f"\n[green]✓[/green] Private key imported successfully!") console.print(f"[cyan]To:[/cyan] {saved_path}") - + # Show the public key public_key_pem = get_public_key_pem(private_key) console.print(f"\n[cyan]Public Key:[/cyan]") console.print(public_key_pem) - + return True @@ -707,137 +719,189 @@ def show_existing_rsa_keys(): key_folder = ZEUZ_NODE_PRIVATE_RSA_KEYS_DIR keys_info = list_existing_keys(key_folder) - + if not keys_info: console.print(f"\n[yellow]No private keys found in:[/yellow] {key_folder}") console.print("[cyan]Use -gpk to generate a new key[/cyan]\n") return - - console.print(f"\n[cyan]Found {len(keys_info)} private key(s) in:[/cyan] {key_folder}\n") - + + console.print( + f"\n[cyan]Found {len(keys_info)} private key(s) in:[/cyan] {key_folder}\n" + ) + for idx, key_info in enumerate(keys_info, 1): console.print(f"[green]Key #{idx}:[/green] {key_info['filename']}") console.print(f"[cyan]Path:[/cyan] {key_info['path']}") - if 'error' in key_info: + if "error" in key_info: console.print(f"[red]Error:[/red] {key_info['error']}\n") else: console.print(f"[cyan]Public Key:[/cyan]") - console.print(key_info['public_key']) + console.print(key_info["public_key"]) def share_private_keys(): """Share all RSA private keys by encrypting and uploading to server.""" console = Console() - + try: from Framework.Utilities import ShareKeysUtil from Framework.Utilities import RequestFormatter, ConfigModule - + key_folder = ZEUZ_NODE_PRIVATE_RSA_KEYS_DIR - + # Collect all private keys keys = ShareKeysUtil.collect_private_keys(key_folder) - + if not keys: console.print(f"\n[red]✗[/red] No private keys found in: {key_folder}") console.print("[cyan]Use -gpk to generate a new key first[/cyan]\n") return False - + console.print(f"\n[cyan]Found {len(keys)} private key(s) to share[/cyan]") - + # Generate share code share_code = ShareKeysUtil.generate_share_code() - + # Encrypt all keys console.print("[cyan]Encrypting keys...[/cyan]") keys_json = json.dumps(keys) encrypted_data = ShareKeysUtil.encrypt_data(keys_json, share_code) - + # Send to server console.print("[cyan]Uploading to server...[/cyan]") - - payload = { - "code": share_code, - "encrypted_data": encrypted_data - } - + + payload = {"code": share_code, "encrypted_data": encrypted_data} + console.print(f"[dim]Endpoint: /zsvc/deploy/v1/share-keys[/dim]") response = RequestFormatter.Post("zsvc/deploy/v1/share-keys", payload) - + if response and response.get("success"): console.print(f"\n[green]✓[/green] Keys shared successfully!") console.print(f"\n[yellow]═══════════════════════════════════[/yellow]") console.print(f"[yellow] Share Code: [bold]{share_code}[/bold][/yellow]") console.print(f"[yellow]═══════════════════════════════════[/yellow]") console.print(f"\n[cyan]This code will expire in 30 minutes.[/cyan]") - console.print(f"[cyan]Use this code with -fe option to fetch keys on another machine.[/cyan]\n") + console.print( + f"[cyan]Use this code with -fe option to fetch keys on another machine.[/cyan]\n" + ) console.print(f"[dim]Example: uv run node_cli.py -fe {share_code}[/dim]\n") return True else: - error_msg = response.get("message", "Unknown error") if response else "No response from server" + error_msg = ( + response.get("message", "Unknown error") + if response + else "No response from server" + ) console.print(f"\n[red]✗[/red] Failed to share keys: {error_msg}\n") return False - + except Exception as e: import traceback as tb + console.print(f"\n[red]✗[/red] Error sharing keys: {str(e)}\n") tb.print_exc() return False +def install_linux_inspector_deps(): + """Install Linux inspector dependencies by running setup script.""" + console = Console() + + script_path = Path(__file__).parent / "Installer" / "setup_linux_inspector.sh" + + if not script_path.exists(): + console.print(f"\n[red]✗[/red] Setup script not found at: {script_path}\n") + return False + + console.print(f"\n[cyan]Installing Linux inspector dependencies...\n[/cyan]") + console.print(f"[dim]Running: {script_path}[/dim]\n") + + try: + result = subprocess.run( + ["bash", str(script_path)], capture_output=False, text=True, check=False + ) + + if result.returncode == 0: + console.print( + f"\n[green]✓[/green] Linux inspector dependencies installed successfully!\n" + ) + return True + else: + console.print( + f"\n[red]✗[/red] Installation failed with exit code: {result.returncode}\n" + ) + return False + + except Exception as e: + console.print(f"\n[red]✗[/red] Error running installation script: {str(e)}\n") + return False + + def fetch_private_keys(share_code: str): """Fetch and decrypt shared RSA private keys from server.""" console = Console() - + from Framework.Utilities import ShareKeysUtil from Framework.Utilities import RequestFormatter - + # Validate code format - if len(share_code) != 9 or share_code[4] != '-': - console.print(f"\n[red]✗[/red] Invalid share code format. Expected format: AkEf-B910 (9 characters with dash)\n") + if len(share_code) != 9 or share_code[4] != "-": + console.print( + f"\n[red]✗[/red] Invalid share code format. Expected format: AkEf-B910 (9 characters with dash)\n" + ) return False - + console.print(f"\n[cyan]Fetching keys from server...[/cyan]") - + try: # Fetch from server response = RequestFormatter.Get(f"zsvc/deploy/v1/fetch-keys/{share_code}") - + if not response or not response.get("success"): - error_msg = response.get("message", "Keys not found or expired") if response else "No response from server" + error_msg = ( + response.get("message", "Keys not found or expired") + if response + else "No response from server" + ) console.print(f"\n[red]✗[/red] Failed to fetch keys: {error_msg}\n") return False - + encrypted_data = response.get("encrypted_data") if not encrypted_data: console.print(f"\n[red]✗[/red] No encrypted data received from server\n") return False - + # Decrypt data console.print("[cyan]Decrypting keys...[/cyan]") decrypted_json = ShareKeysUtil.decrypt_data(encrypted_data, share_code) - + if not decrypted_json: - console.print(f"\n[red]✗[/red] Failed to decrypt keys. Invalid share code or corrupted data.\n") + console.print( + f"\n[red]✗[/red] Failed to decrypt keys. Invalid share code or corrupted data.\n" + ) return False - + keys = json.loads(decrypted_json) - + # Save keys console.print(f"[cyan]Saving {len(keys)} key(s)...[/cyan]") key_folder = ZEUZ_NODE_PRIVATE_RSA_KEYS_DIR - success_count, skipped_count, failed_count = ShareKeysUtil.save_private_keys(keys, key_folder) - + success_count, skipped_count, failed_count = ShareKeysUtil.save_private_keys( + keys, key_folder + ) + console.print(f"\n[green]✓[/green] Keys fetched successfully!") console.print(f"[cyan]Saved:[/cyan] {success_count} key(s)") if skipped_count > 0: - console.print(f"[yellow]Skipped:[/yellow] {skipped_count} key(s) (duplicates)") + console.print( + f"[yellow]Skipped:[/yellow] {skipped_count} key(s) (duplicates)" + ) if failed_count > 0: console.print(f"[red]Failed:[/red] {failed_count} key(s)") console.print(f"[cyan]Location:[/cyan] {key_folder}\n") - + return True - + except json.JSONDecodeError as e: console.print(f"\n[red]✗[/red] Error parsing decrypted data: {str(e)}\n") return False @@ -846,9 +910,9 @@ def fetch_private_keys(share_code: str): return False - # Delete Old Subfolders in Automationlog folder. + def get_subfolders_created_before_n_days(folder_path, log_delete_interval): subfolder_paths = [] current_time = time.time() @@ -958,6 +1022,9 @@ async def command_line_args() -> Path | None: Example 14 - Fetch shared RSA private keys using a share code: python node_cli.py -fe AkEf-B910 + Example 15 - Install Linux desktop automation dependencies: + python node_cli.py -ild + Use -h or --help to see full documentation of all available arguments. """ # try: @@ -1071,6 +1138,14 @@ async def command_line_args() -> Path | None: metavar="", ) + # Desktop automation setup and UI inspection arguments + parser_object.add_argument( + "-ild", + "--install-linux-deps", + action="store_true", + help="Install Linux desktop automation dependencies (runs Installer/setup_linux_inspector.sh)", + ) + all_arguments = parser_object.parse_args() server = all_arguments.server @@ -1096,17 +1171,20 @@ async def command_line_args() -> Path | None: share_keys = all_arguments.share fetch_code = all_arguments.fetch + # Desktop automation and UI inspection options + install_linux_deps = all_arguments.install_linux_deps + # Handle RSA key management commands if generate_key: generate_rsa_key() sys.exit(0) - + if add_key: if add_existing_rsa_key(add_key): sys.exit(0) else: sys.exit(1) - + if show_keys: show_existing_rsa_keys() sys.exit(0) @@ -1115,12 +1193,17 @@ async def command_line_args() -> Path | None: if share_keys: share_private_keys() sys.exit(0) - + if fetch_code: if fetch_private_keys(fetch_code): sys.exit(0) else: sys.exit(1) + if install_linux_deps: + if install_linux_inspector_deps(): + sys.exit(0) + else: + sys.exit(1) # Update chrome extension download settings if specified if chrome_fetch is not None: @@ -1206,6 +1289,7 @@ async def set_new_credentials(server, api_key): ConfigModule.remove_config_value(AUTHENTICATION_TAG, "server_address") ConfigModule.add_config_value(AUTHENTICATION_TAG, "server_address", server) + def print_system_info_version(): """Prints the system information and version of the Node""" print( @@ -1222,7 +1306,10 @@ def create_temp_ini_automation_log(): automation_log_dir.mkdir(exist_ok=True) print(f"Created AutomationLog directory at {automation_log_dir}") - TMP_INI_FILE = automation_log_dir / ConfigModule.get_config_value("Advanced Options", "_file") + TMP_INI_FILE = automation_log_dir / ConfigModule.get_config_value( + "Advanced Options", "_file" + ) + async def main(): print_system_info_version() @@ -1275,9 +1362,7 @@ async def main(): .strip() ) api = ( - ConfigModule.get_config_value(AUTHENTICATION_TAG, "api-key") - .strip('"') - .strip() + ConfigModule.get_config_value(AUTHENTICATION_TAG, "api-key").strip('"').strip() ) if len(server_name) == 0 and len(api) == 0: @@ -1289,10 +1374,12 @@ async def main(): await asyncio.sleep(1) else: - asyncio.create_task(Login( - server_name=server_name, - log_dir=log_dir, - )) + asyncio.create_task( + Login( + server_name=server_name, + log_dir=log_dir, + ) + ) while True: if STATE.reconnect_with_credentials is not None: await destroy_session() @@ -1319,10 +1406,13 @@ async def main(): ) console.print("Please log in to ZeuZ server and connect.") - asyncio.create_task(Login( - server_name=server_name, - log_dir=log_dir, - )) + asyncio.create_task( + Login( + server_name=server_name, + log_dir=log_dir, + ) + ) await asyncio.sleep(1) -asyncio.run(main()) \ No newline at end of file + +asyncio.run(main()) diff --git a/server/linux.py b/server/linux.py new file mode 100644 index 000000000..63b1baba3 --- /dev/null +++ b/server/linux.py @@ -0,0 +1,105 @@ +import hashlib +import os +import base64 +import asyncio +import requests +from typing import Literal +from fastapi import APIRouter +from pydantic import BaseModel + +from Framework.Utilities import ConfigModule, CommonUtil + + +router = APIRouter(prefix="/linux", tags=["linux"]) + +SCREENSHOT_PATH = "linux_screen.png" + +class InspectorResponse(BaseModel): + """Response model for the /inspector endpoint.""" + status: Literal["ok", "error"] = "ok" + ui_xml: str | None = None + screenshot: str | None = None # Base64 encoded image + error: str | None = None + + +@router.get("/inspect") +def inspect(app_name: str | None = None): + """Get the Linux UI DOM and screenshot.""" + from Framework.Built_In_Automation.Desktop.Linux import BuiltInFunctions + if BuiltInFunctions is None: + return InspectorResponse(status="error", error="Linux automation module not available") + + try: + # Determine app name + target_app = app_name + if not target_app: + target_app = BuiltInFunctions.get_latest_app_name() + + if not target_app: + return InspectorResponse(status="error", error="No application specified and no latest app found.") + + # Capture UI + xml_content = BuiltInFunctions.get_ui_tree(target_app) + if not xml_content: + return InspectorResponse(status="error", error=f"Failed to get UI tree for app: {target_app}") + + # Capture Screenshot + full_screenshot_path = os.path.abspath(SCREENSHOT_PATH) + + screenshot_base64 = None + if BuiltInFunctions.capture_screenshot(full_screenshot_path, target_app): + try: + with open(full_screenshot_path, 'rb') as img_file: + screenshot_bytes = img_file.read() + screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8') + except Exception: + pass + + return InspectorResponse( + status="ok", + ui_xml=xml_content, + screenshot=screenshot_base64 + ) + except Exception as e: + return InspectorResponse( + status="error", + error=str(e) + ) + + +async def upload_linux_ui_dump(): + """Continuously upload Linux UI dump if changed.""" + from Framework.Built_In_Automation.Desktop.Linux import BuiltInFunctions + if BuiltInFunctions is None: + return + + prev_xml_hash = "" + while True: + try: + target_app = BuiltInFunctions.get_latest_app_name() + if target_app: + xml_content = BuiltInFunctions.get_ui_tree(target_app) + if xml_content: + new_xml_hash = hashlib.sha256(xml_content.encode('utf-8')).hexdigest() + + if prev_xml_hash != new_xml_hash: + prev_xml_hash = new_xml_hash + + url = ConfigModule.get_config_value("Authentication", "server_address").strip() + "/node_ai_contents/" + apiKey = ConfigModule.get_config_value("Authentication", "api-key").strip() + + res = requests.post( + url, + headers={"X-Api-Key": apiKey}, + json={ + "dom_linux": {"dom": xml_content}, + "node_id": CommonUtil.MachineInfo().getLocalUser().lower(), + "app_name": target_app # Extra info might be useful + } + ) + if res.ok: + CommonUtil.ExecLog("", "Linux UI dump uploaded successfully", iLogLevel=1) + except Exception as e: + CommonUtil.ExecLog("", f"Error uploading Linux UI dump: {str(e)}", iLogLevel=3) + + await asyncio.sleep(5) diff --git a/server/main.py b/server/main.py index 3a8f7a07a..96213cfdf 100644 --- a/server/main.py +++ b/server/main.py @@ -10,6 +10,7 @@ from server.node_operator import router as operator_router from server.mobile import router as mobile_router, upload_android_ui_dump from server.mac import router as mac_router +from server.linux import router as linux_router import asyncio class EndpointFilter(logging.Filter): @@ -40,6 +41,7 @@ def main() -> FastAPI: v1router.include_router(evaluator_router) v1router.include_router(mobile_router) v1router.include_router(mac_router) + v1router.include_router(linux_router) app = FastAPI() app.include_router(v1router)