diff --git a/Framework/Built_In_Automation/Mobile/CrossPlatform/Appium/BuiltInFunctions.py b/Framework/Built_In_Automation/Mobile/CrossPlatform/Appium/BuiltInFunctions.py index ae3ae20f3..32705eb4c 100755 --- a/Framework/Built_In_Automation/Mobile/CrossPlatform/Appium/BuiltInFunctions.py +++ b/Framework/Built_In_Automation/Mobile/CrossPlatform/Appium/BuiltInFunctions.py @@ -933,6 +933,7 @@ def start_appium_driver( if str(appium_details[device_id]["type"]).lower() == "android": # All Desired caps = https://appium.io/docs/en/writing-running-appium/caps/ desired_caps["platformName"] = appium_details[device_id]["type"] # Set platform name + desired_caps["udid"] = appium_details[device_id]["serial"] desired_caps["autoLaunch"] = "false" # Do not launch application desired_caps["fullReset"] = "false" # Do not reinstall application if no_reset: diff --git a/Framework/MainDriverApi.py b/Framework/MainDriverApi.py index 31a0efff9..f4dca8d34 100644 --- a/Framework/MainDriverApi.py +++ b/Framework/MainDriverApi.py @@ -1260,6 +1260,7 @@ def send_dom_variables(): def set_device_info_according_to_user_order(device_order, device_dict, test_case_no, test_case_name, user_info_object, Userid, **kwargs): # Need to set device_info for browserstack here global device_info + device_order = [[i, i] for i in range(1, len(device_dict) + 1)] # overriding zsvc return device order. zsvc only returns [[1,1]] without considering the number of devices available shared.Set_Shared_Variables("device_order", device_order) if isinstance(device_order, list): try: diff --git a/Framework/install_handler/ios/simulator.py b/Framework/install_handler/ios/simulator.py new file mode 100644 index 000000000..593d71548 --- /dev/null +++ b/Framework/install_handler/ios/simulator.py @@ -0,0 +1,1056 @@ +import asyncio +import platform +import os +import shutil +import subprocess +import json +import re +import tempfile +from pathlib import Path +from Framework.install_handler.utils import send_response + +async def _send_status(status: str, comment: str): + """Helper to send status responses.""" + await send_response( + { + "action": "status", + "data": { + "category": "iOS", + "name": "Simulator", + "status": status, + "comment": comment, + }, + } + ) + +async def _create_default_device() -> bool: + """ + Attempts to create a default iPhone simulator using the newest available + iOS runtime and the newest available iPhone device type. + """ + await _send_status("installing", "No devices found. Creating a new default simulator...") + + try: + # 1. Get available Runtimes (JSON is safer to parse) + # We need to find the installed iOS runtime ID (e.g. com.apple.CoreSimulator.SimRuntime.iOS-18-0) + runtime_proc = subprocess.run( + ["xcrun", "simctl", "list", "runtimes", "-j"], + capture_output=True, + text=True + ) + if runtime_proc.returncode != 0: + raise Exception("Failed to list runtimes.") + + runtimes_data = json.loads(runtime_proc.stdout) + available_runtimes = runtimes_data.get("runtimes", []) + + # Filter for iOS runtimes and sort to find the latest version + ios_runtimes = [ + r for r in available_runtimes + if r.get("platform") == "iOS" or "iOS" in r.get("name", "") + ] + + if not ios_runtimes: + raise Exception("No iOS Runtimes found after installation.") + + # Sort by version (assuming name contains version or using buildversion) + # We'll just pick the first one which is usually the latest in the list, + # or we can try to sort by the 'version' key if available. + target_runtime = ios_runtimes[-1] # Usually the list is ordered, taking the last one is a safe bet for 'newest' + runtime_id = target_runtime["identifier"] + runtime_name = target_runtime["name"] + + # 2. Get available Device Types + type_proc = subprocess.run( + ["xcrun", "simctl", "list", "devicetypes", "-j"], + capture_output=True, + text=True + ) + types_data = json.loads(type_proc.stdout) + + # Find a modern iPhone (e.g., iPhone 16, 15, or just the last "iPhone" in the list) + device_types = [ + t for t in types_data.get("devicetypes", []) + if "iPhone" in t.get("name", "") + ] + + if not device_types: + raise Exception("No iPhone device types found.") + + # Pick the last one (usually the newest model) + target_device_type = device_types[-1] + device_type_id = target_device_type["identifier"] + device_name = target_device_type["name"] + + # 3. Create the device + new_device_name = f"{device_name} (Default)" + await _send_status("installing", f"Creating {new_device_name} with {runtime_name}...") + + create_proc = subprocess.run( + ["xcrun", "simctl", "create", new_device_name, device_type_id, runtime_id], + capture_output=True, + text=True + ) + + if create_proc.returncode == 0: + new_udid = create_proc.stdout.strip() + await _send_status("installed", f"Created new device: {new_device_name} ({new_udid})") + return True + else: + raise Exception(f"Failed to create device: {create_proc.stderr}") + + except Exception as e: + await _send_status("error", f"Auto-creation of device failed: {e}") + return False + +async def check_status() -> bool: + """Check if iOS Simulator is installed and available.""" + print("[simulator] Checking status...") + + if platform.system().lower() != "darwin": + await _send_status("error", "Unsupported OS. iOS Simulator is only available on macOS.") + return False + + if not os.path.exists("/Applications/Xcode.app"): + await _send_status("not installed", "Xcode must be installed before using iOS Simulator.") + return False + + if not shutil.which("xcrun"): + await _send_status("not installed", "xcrun not found. Xcode command line tools missing.") + return False + + try: + # Check for available devices + result = subprocess.run( + ["xcrun", "simctl", "list", "devices", "available", "iOS"], + capture_output=True, + text=True, + timeout=30, + ) + + if result.returncode != 0: + await _send_status("error", f"simctl error: {result.stderr.strip()}") + return False + + output = result.stdout + # Basic check: verify lines with UDIDs exist + device_lines = [line for line in output.splitlines() if "(" in line and ")" in line] + + if len(device_lines) > 0: + await _send_status("installed", f"iOS Simulator available with {len(device_lines)} devices.") + return True + else: + await _send_status("not installed", "No iOS Simulator devices found.") + return False + + except Exception as e: + await _send_status("error", f"Error checking iOS Simulator: {e}") + return False + +async def _install_command_line_tools() -> bool: + """Install Xcode command line tools if not present.""" + try: + result = subprocess.run(["xcode-select", "-p"], capture_output=True, text=True) + if result.returncode == 0: + return True + + await _send_status("installing", "Installing Xcode command line tools...") + subprocess.run(["xcode-select", "--install"], capture_output=True, text=True) + + # Poll for completion (simplified for brevity) + for _ in range(60): # wait up to 10 mins + await asyncio.sleep(10) + if subprocess.run(["xcode-select", "-p"], capture_output=True).returncode == 0: + return True + + await _send_status("error", "Timed out waiting for command line tools.") + return False + except Exception as e: + await _send_status("error", f"Error installing tools: {e}") + return False + +async def _install_simulator_runtime(user_password: str) -> bool: + """Install iOS Simulator runtime if missing.""" + try: + # Ensure xcode-select is pointing to Xcode app + cmd = ["xcode-select", "--switch", "/Applications/Xcode.app/Contents/Developer"] + if user_password: + # Use sudo -S for password piping if provided + subprocess.run( + f"echo '{user_password}' | sudo -S {' '.join(cmd)}", + shell=True, capture_output=True + ) + else: + subprocess.run(cmd, capture_output=True) + + # Check existing runtimes + await _send_status("installing", "Checking iOS Simulator runtimes...") + + # Download iOS platform using xcodebuild (Your provided block) + await _send_status("installing", "Downloading iOS Simulator runtime (this may take a while)...") + + with subprocess.Popen( + ["xcodebuild", "-downloadPlatform", "iOS"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 + ) as process: + for line in process.stdout: + await _send_status("installing", line.strip()) + + if process.returncode != 0: + # Only fail if it's a real error. Sometimes it fails if already installed. + # We proceed to verification regardless. + pass + + await asyncio.sleep(5) + + # Verify installation + verify_result = subprocess.run( + ["xcrun", "simctl", "list", "devices", "available", "iOS"], + capture_output=True, + text=True, + timeout=30, + ) + + device_count = 0 + if verify_result.returncode == 0: + device_lines = [line for line in verify_result.stdout.splitlines() if "(" in line and ")" in line] + device_count = len(device_lines) + + # LOGIC CHANGE: If installed but no devices, create one. + if device_count > 0: + await _send_status("installed", f"iOS Simulator runtime ready with {device_count} devices.") + return True + else: + # Runtime might be there, but no devices created yet. + # Attempt to create a default device. + return await _create_default_device() + + except Exception as e: + await _send_status("error", f"Error installing simulator runtime: {e}") + return False + +async def get_available_simulators() -> list[dict]: + """ + List available iOS Simulators by running xcrun simctl list devices. + Returns a list of dictionaries with name, udid, and state fields. + """ + try: + if platform.system().lower() != "darwin": + return [] + + if not shutil.which("xcrun"): + return [] + + result = subprocess.run( + ["xcrun", "simctl", "list", "devices", "available", "-j"], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode != 0: + return [] + + data = json.loads(result.stdout) + simulators = [] + + for runtime_key, devices in data.get("devices", {}).items(): + if "iOS" not in runtime_key: + continue + + # Extract iOS version from runtime key (e.g., "com.apple.CoreSimulator.SimRuntime.iOS-18-2" -> "iOS 18.2") + version_match = re.search(r'iOS-(\d+)-(\d+)', runtime_key) + runtime_version = f"iOS {version_match.group(1)}.{version_match.group(2)}" if version_match else runtime_key + + for device in devices: + if device.get("isAvailable"): + simulators.append({ + "name": device.get("name"), + "udid": device.get("udid"), + "state": device.get("state"), + "runtime": runtime_version, + "comment": f"{device.get('name')} - {runtime_version} ({device.get('state')})" + }) + + return simulators + + except Exception as e: + print(f"[simulator] Error listing simulators: {e}") + return [] + + +async def get_available_device_types() -> list[dict]: + """ + Get available iOS device types by running xcrun simctl list devicetypes. + Returns a list of dictionaries with package (device type ID), version (device name), and description. + Format matches Android's device list for consistency. + """ + try: + if platform.system().lower() != "darwin": + return [] + + if not shutil.which("xcrun"): + return [] + + result = subprocess.run( + ["xcrun", "simctl", "list", "devicetypes", "-j"], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode != 0: + return [] + + data = json.loads(result.stdout) + device_types = [] + + for device_type in data.get("devicetypes", []): + name = device_type.get("name", "") + identifier = device_type.get("identifier", "") + + # Filter for iPhone and iPad devices only + if "iPhone" in name or "iPad" in name: + device_types.append({ + "package": identifier, # device type ID + "version": name, # device name + "description": "Apple" # OEM + }) + + return device_types + + except Exception as e: + print(f"[simulator] Error listing device types: {e}") + return [] + + +async def get_available_runtimes() -> list[dict]: + """ + Get available iOS runtimes by running xcrun simctl list runtimes. + Returns a list of dictionaries with runtime ID, version, and availability. + """ + try: + if platform.system().lower() != "darwin": + return [] + + if not shutil.which("xcrun"): + return [] + + result = subprocess.run( + ["xcrun", "simctl", "list", "runtimes", "-j"], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode != 0: + return [] + + data = json.loads(result.stdout) + runtimes = [] + + for runtime in data.get("runtimes", []): + platform_name = runtime.get("platform", "") + name = runtime.get("name", "") + + # Filter for iOS runtimes only + if platform_name == "iOS" or "iOS" in name: + runtimes.append({ + "identifier": runtime.get("identifier", ""), + "name": name, + "version": runtime.get("version", ""), + "isAvailable": runtime.get("isAvailable", False) + }) + + return runtimes + + except Exception as e: + print(f"[simulator] Error listing runtimes: {e}") + return [] + + +async def ios_simulator_install(): + """ + Get available device types when install button is clicked. + Returns list of available iOS device types for simulator creation. + """ + print("[simulator] Getting available device types...") + + try: + # Check if macOS + if platform.system().lower() != "darwin": + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": "Device Types", + "status": "Not Found", + "comment": "iOS Simulator is only available on macOS", + "installables": [] + } + }) + return False + + # Check if Xcode is installed + if not os.path.exists("/Applications/Xcode.app"): + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": "Device Types", + "status": "Not Found", + "comment": "Xcode must be installed first", + "installables": [] + } + }) + return False + + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": "Device Types", + "status": "Fetching", + "comment": "Fetching available device types...", + "installables": [] + } + }) + + # Get available device types + device_types = await get_available_device_types() + print(f"[simulator] Available device types: {device_types}") + + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "installables": device_types, + } + }) + return True + + except Exception as e: + print(f"[simulator] Error getting device types: {e}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": "Device Types", + "status": "error", + "comment": f"Error: {e}", + "installables": [] + } + }) + return False + + +async def get_filtered_simulator_services(): + """ + Get available iOS simulators and return a formatted category dictionary. + Similar to Android's get_filtered_avd_services. + + Returns: + Dictionary with category "iOSSimulator" and filtered services, or None if no simulators + """ + current_os = platform.system().lower() + + # iOS simulators only available on macOS + if current_os != "darwin": + return None + + try: + simulators = await get_available_simulators() + + if not simulators: + return None + + # Format simulators as services + services_list = [] + for sim in simulators: + services_list.append({ + "name": sim["udid"], # Use UDID as the identifier + "status": "installed", + "comment": sim["comment"], + "install_text": "delete", + "install_function": "delete_simulator", + "os": ["darwin"], + "user_password": "no", + }) + + return { + "group": { + "check_text": "", + "install_text": "", + }, + "category": "iOSSimulator", + "services": services_list, + } + + except Exception as e: + print(f"[simulator] Error getting filtered simulators: {e}") + return None + + +async def delete_simulator(udid: str) -> bool: + """ + Delete iOS Simulator by UDID using simctl delete. + Sends response to server on success or failure. + + Args: + udid: The UDID of the simulator to delete + + Returns: + bool: True if successful, False otherwise + """ + try: + # Get simulator info first to show proper name in messages + simulators = await get_available_simulators() + simulator_info = next((s for s in simulators if s["udid"] == udid), None) + + if not simulator_info: + error_msg = f"Simulator {udid} not found" + print(f"[simulator] {error_msg}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "not installed", + "comment": error_msg, + } + }) + return False + + simulator_name = simulator_info["name"] + print(f"[simulator] Deleting simulator: {simulator_name} ({udid})") + + # Send deleting status + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "uninstalling", + "comment": f"Deleting simulator {simulator_name}...", + } + }) + + # Delete the simulator + result = subprocess.run( + ["xcrun", "simctl", "delete", udid], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode != 0: + error_msg = f"Failed to delete simulator: {result.stderr.strip()}" + print(f"[simulator] {error_msg}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "error", + "comment": error_msg, + } + }) + return False + + print(f"[simulator] Simulator deleted successfully: {simulator_name} ({udid})") + + # Send success response + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "not installed", + "comment": f"Simulator '{simulator_name}' deleted successfully", + } + }) + return True + + except subprocess.TimeoutExpired: + error_msg = "Simulator deletion timed out" + print(f"[simulator] {error_msg}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "error", + "comment": error_msg, + } + }) + return False + except Exception as e: + error_msg = f"Error deleting simulator: {e}" + print(f"[simulator] {error_msg}") + import traceback + traceback.print_exc() + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "error", + "comment": error_msg, + } + }) + return False + + +async def launch_simulator(udid: str) -> bool: + """ + Launch iOS Simulator using simctl boot and open Simulator app. + Checks for WebDriverAgent installation and installs if needed. + Launches WebDriverAgent app automatically. + Non-blocking - the simulator starts in the background. + Sends response to server on success or failure. + """ + try: + # Get simulator info first + simulators = await get_available_simulators() + simulator_info = next((s for s in simulators if s["udid"] == udid), None) + + if not simulator_info: + error_msg = f"Simulator {udid} not found" + print(f"[simulator] {error_msg}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "not installed", + "comment": error_msg, + } + }) + return False + + simulator_name = simulator_info['name'] + is_already_booted = simulator_info["state"] == "Booted" + + # Boot simulator if not already booted + if not is_already_booted: + # Open Simulator app + subprocess.Popen( + ["open", "-a", "Simulator"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + + # Small delay to let Simulator app launch + await asyncio.sleep(1) + + # Boot the specific device + result = subprocess.run( + ["xcrun", "simctl", "boot", udid], + capture_output=True, + text=True + ) + + if result.returncode != 0: + error_msg = f"Failed to boot simulator: {result.stderr.strip()}" + print(f"[simulator] {error_msg}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "error", + "comment": error_msg, + } + }) + return False + + print(f"[simulator] Booting simulator: {simulator_name}...") + + # Wait for boot to complete + await asyncio.sleep(3) + else: + print(f"[simulator] Simulator {simulator_name} already running") + + # Check if WebDriverAgent is installed on this simulator + print(f"[simulator] Checking WebDriverAgent installation on {simulator_name}...") + check_wda = subprocess.run( + ["xcrun", "simctl", "get_app_container", udid, "com.facebook.WebDriverAgentRunner.xctrunner"], + capture_output=True, + text=True + ) + + wda_installed = check_wda.returncode == 0 and check_wda.stdout.strip() + + if not wda_installed: + print(f"[simulator] WebDriverAgent not found on {simulator_name}, installing...") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "installing", + "comment": f"Installing WebDriverAgent on {simulator_name}...", + } + }) + + # Get WebDriverAgent path (same as webdriver.py uses) + home = Path.home() + webdriver_path = home / ".zeuz" / "WebDriverAgent" + + # Check if WebDriverAgent repo exists + if not (webdriver_path / "WebDriverAgent.xcodeproj").exists(): + print(f"[simulator] Cloning WebDriverAgent repository...") + if webdriver_path.exists(): + shutil.rmtree(webdriver_path) + webdriver_path.parent.mkdir(parents=True, exist_ok=True) + + clone_result = subprocess.run( + ["git", "clone", "--depth", "1", "https://github.com/appium/WebDriverAgent.git", str(webdriver_path)], + capture_output=True, + text=True + ) + + if clone_result.returncode != 0: + error_msg = f"Failed to clone WebDriverAgent: {clone_result.stderr}" + print(f"[simulator] {error_msg}") + # Continue with launching simulator even if WDA install fails + + # Build and install WebDriverAgent for this simulator + if (webdriver_path / "WebDriverAgent.xcodeproj").exists(): + # Check if app is already built (in standard location) + standard_build_path = webdriver_path / "Build" / "Products" / "Debug-iphonesimulator" / "WebDriverAgentRunner-Runner.app" + app_path_to_install = None + + if standard_build_path.exists(): + print(f"[simulator] Found pre-built WebDriverAgent at {standard_build_path}") + app_path_to_install = standard_build_path + else: + # Need to build + print(f"[simulator] Building WebDriverAgent for {simulator_name}...") + + with tempfile.TemporaryDirectory() as derived_data_path: + build_cmd = [ + "xcodebuild", + "-project", "WebDriverAgent.xcodeproj", + "-scheme", "WebDriverAgentRunner", + "-destination", f"platform=iOS Simulator,id={udid}", + "-derivedDataPath", derived_data_path, + "-allowProvisioningUpdates", + "build-for-testing" + ] + + build_result = subprocess.run( + build_cmd, + cwd=str(webdriver_path), + capture_output=True, + text=True, + timeout=1200 + ) + + if build_result.returncode == 0: + app_path = Path(derived_data_path) / "Build" / "Products" / "Debug-iphonesimulator" / "WebDriverAgentRunner-Runner.app" + + if app_path.exists(): + # Copy to standard location before temp directory is deleted + standard_build_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copytree(app_path, standard_build_path, dirs_exist_ok=True) + print(f"[simulator] Copied built app to {standard_build_path}") + app_path_to_install = standard_build_path + else: + print(f"[simulator] WebDriverAgent app not found at {app_path}") + else: + print(f"[simulator] WebDriverAgent build failed: {build_result.stderr[-500:]}") + + # Install the app if we have it + if app_path_to_install: + print(f"[simulator] Installing WebDriverAgent on {simulator_name}...") + install_result = subprocess.run( + ["xcrun", "simctl", "install", udid, str(app_path_to_install)], + capture_output=True, + text=True + ) + + if install_result.returncode == 0: + print(f"[simulator] WebDriverAgent installed successfully on {simulator_name}") + wda_installed = True + else: + print(f"[simulator] Failed to install WebDriverAgent: {install_result.stderr}") + else: + print(f"[simulator] WebDriverAgent already installed on {simulator_name}") + + # Launch WebDriverAgent if installed + if wda_installed: + print(f"[simulator] Launching WebDriverAgent on {simulator_name}...") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "installing", + "comment": f"Launching WebDriverAgent on {simulator_name}...", + } + }) + + # Launch WebDriverAgent app + launch_wda = subprocess.run( + ["xcrun", "simctl", "launch", udid, "com.facebook.WebDriverAgentRunner.xctrunner"], + capture_output=True, + text=True + ) + + if launch_wda.returncode == 0: + print(f"[simulator] WebDriverAgent launched successfully on {simulator_name}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "installed", + "comment": f"Simulator {simulator_name} running with WebDriverAgent", + } + }) + else: + print(f"[simulator] Failed to launch WebDriverAgent: {launch_wda.stderr}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "installed", + "comment": f"Simulator {simulator_name} running (WebDriverAgent launch failed)", + } + }) + else: + # Simulator is running but WDA not available + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "installed", + "comment": f"Simulator {simulator_name} is running", + } + }) + + return True + + except subprocess.TimeoutExpired: + error_msg = "Operation timed out" + print(f"[simulator] {error_msg}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "error", + "comment": error_msg, + } + }) + return False + except Exception as e: + error_msg = f"Failed to launch simulator: {e}" + print(f"[simulator] {error_msg}") + import traceback + traceback.print_exc() + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": udid, + "status": "error", + "comment": error_msg, + } + }) + return False + + +async def create_simulator_from_device_type(device_param: str) -> bool: + """ + Create iOS Simulator from device type ID and device name. + Uses the highest available iOS runtime. + + Args: + device_param: Format "install device;device_type_id;device_name" + Example: "install device;com.apple.CoreSimulator.SimDeviceType.iPhone-16;iPhone 16" + + Returns: + bool: True if successful, False otherwise + """ + try: + # Parse device parameter + parts = device_param.split(";") + if len(parts) < 3: + error_msg = f"Invalid device parameter format. Expected 'install device;device_type_id;device_name', got: {device_param}" + print(f"[simulator] {error_msg}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": "Unknown", + "status": "error", + "comment": error_msg, + } + }) + return False + + device_type_id = parts[1].strip() + device_name = parts[2].strip() + + print(f"[simulator] Creating simulator '{device_name}' with device type '{device_type_id}'") + + # Get available runtimes + runtimes = await get_available_runtimes() + if not runtimes: + error_msg = "No iOS runtimes found. Please install Xcode and iOS runtime first." + print(f"[simulator] {error_msg}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": device_name, + "status": "not installed", + "comment": error_msg, + } + }) + return False + + # Get the latest available runtime + available_runtimes = [r for r in runtimes if r.get("isAvailable", False)] + if not available_runtimes: + error_msg = "No available iOS runtimes found." + print(f"[simulator] {error_msg}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": device_name, + "status": "not installed", + "comment": error_msg, + } + }) + return False + + # Use the last runtime (usually the newest) + runtime = available_runtimes[-1] + runtime_id = runtime["identifier"] + runtime_name = runtime["name"] + + print(f"[simulator] Using runtime: {runtime_name} ({runtime_id})") + + # Generate a unique simulator name + existing_sims = await get_available_simulators() + existing_names = [s["name"] for s in existing_sims] + + # Create unique name + simulator_name = device_name + counter = 1 + while simulator_name in existing_names: + simulator_name = f"{device_name} ({counter})" + counter += 1 + + # Send installing status + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": device_name, + "status": "installing", + "comment": f"Creating {simulator_name} with {runtime_name}...", + } + }) + + # Create the simulator + result = subprocess.run( + ["xcrun", "simctl", "create", simulator_name, device_type_id, runtime_id], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode != 0: + error_msg = f"Failed to create simulator: {result.stderr.strip()}" + print(f"[simulator] {error_msg}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": device_name, + "status": "not installed", + "comment": error_msg, + } + }) + return False + + new_udid = result.stdout.strip() + print(f"[simulator] Simulator created successfully: {simulator_name} ({new_udid})") + + # Send success response + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": device_name, + "status": "installed", + "comment": f"Simulator '{simulator_name}' created successfully ({new_udid})", + } + }) + + return True + + except subprocess.TimeoutExpired: + error_msg = "Simulator creation timed out" + print(f"[simulator] {error_msg}") + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": device_name if 'device_name' in locals() else "Unknown", + "status": "error", + "comment": error_msg, + } + }) + return False + except Exception as e: + error_msg = f"Error creating simulator: {e}" + print(f"[simulator] {error_msg}") + import traceback + traceback.print_exc() + await send_response({ + "action": "status", + "data": { + "category": "iOSSimulator", + "name": device_name if 'device_name' in locals() else "Unknown", + "status": "error", + "comment": error_msg, + } + }) + return False + + +async def install(user_password: str = "") -> bool: + """Main install entry point.""" + print("[simulator] Installing...") + + if platform.system().lower() != "darwin": + await _send_status("error", "iOS Simulator is only available on macOS.") + return False + + if await check_status(): + return True + + if not os.path.exists("/Applications/Xcode.app"): + await _send_status("error", "Xcode must be installed first.") + return False + + await _send_status("installing", "Setting up iOS Simulator...") + + if not await _install_command_line_tools(): + return False + + if not await _install_simulator_runtime(user_password): + return False + + return await check_status() \ No newline at end of file diff --git a/Framework/install_handler/ios/webdriver.py b/Framework/install_handler/ios/webdriver.py new file mode 100644 index 000000000..6e5d0801e --- /dev/null +++ b/Framework/install_handler/ios/webdriver.py @@ -0,0 +1,250 @@ +import platform +import os +import shutil +import subprocess +import json +import tempfile +import asyncio +from pathlib import Path +from Framework.install_handler.utils import send_response + +async def _send_status(status: str, comment: str): + """Helper to send status responses.""" + print(f"[{status}] {comment}") + await send_response( + { + "action": "status", + "data": { + "category": "iOS", + "name": "WebDriver", + "status": status, + "comment": comment, + }, + } + ) + +def _get_webdriver_path() -> Path: + home = Path.home() + return home / ".zeuz" / "WebDriverAgent" + +async def _check_xcode_installed() -> bool: + if not os.path.exists("/Applications/Xcode.app"): + return False + return shutil.which("xcodebuild") is not None + +async def _get_best_simulator() -> tuple[str, str] | None: + """Finds the best candidate simulator (preferring iPhones).""" + try: + result = subprocess.run( + ["xcrun", "simctl", "list", "devices", "available", "-j"], + capture_output=True, text=True + ) + + if result.returncode != 0: + return None + + data = json.loads(result.stdout) + candidates = [] + + for runtime_key, devices in data.get("devices", {}).items(): + if "iOS" not in runtime_key: + continue + for device in devices: + if device.get("isAvailable"): + name = device.get("name") + uuid = device.get("udid") + score = 2 if "iPhone" in name else 1 + candidates.append((score, name, uuid)) + + candidates.sort(key=lambda x: x[0], reverse=True) + if candidates: + return candidates[0][1], candidates[0][2] + return None + except Exception as e: + print(f"Error listing simulators: {e}") + return None + +async def _boot_simulator_if_needed(device_uuid: str) -> bool: + """Boots the simulator ONLY if it is currently shutdown.""" + try: + # 1. Check current state + list_res = subprocess.run( + ["xcrun", "simctl", "list", "devices", "-j"], + capture_output=True, text=True + ) + if list_res.returncode == 0: + data = json.loads(list_res.stdout) + for runtime, devices in data.get("devices", {}).items(): + for device in devices: + if device.get("udid") == device_uuid: + if device.get("state") == "Booted": + # Already booted + return True + + # 2. Boot if needed + await _send_status("installing", "Booting simulator to perform check/install...") + subprocess.run(["open", "-a", "Simulator"], capture_output=True) + + # Give the app a moment to launch + await asyncio.sleep(2) + + subprocess.run(["xcrun", "simctl", "boot", device_uuid], capture_output=True) + + # 3. Wait for 'Booted' status + res = subprocess.run( + ["xcrun", "simctl", "bootstatus", device_uuid], + capture_output=True, timeout=120 + ) + return res.returncode == 0 + except Exception as e: + print(f"Boot exception: {e}") + # We return True here to attempt the next step anyway, + # as sometimes bootstatus fails even if the device works. + return True + +async def check_status() -> bool: + """Checks if WebDriverAgent is installed (Ensures Simulator is ON).""" + print("[webdriver] Checking status...") + + if platform.system().lower() != "darwin": + await _send_status("error", "Unsupported OS.") + return False + + if not await _check_xcode_installed(): + await _send_status("not installed", "Xcode missing.") + return False + + webdriver_path = _get_webdriver_path() + if not (webdriver_path / "WebDriverAgent.xcodeproj").exists(): + await _send_status("not installed", "Repo not cloned.") + return False + + sim_info = await _get_best_simulator() + if not sim_info: + await _send_status("not installed", "No iOS Simulator devices found.") + return False + + name, uuid = sim_info + + if not await _boot_simulator_if_needed(uuid): + await _send_status("error", f"Failed to boot {name} for verification.") + return False + + # Check if app container exists + cmd = ["xcrun", "simctl", "get_app_container", uuid, "com.facebook.WebDriverAgentRunner.xctrunner"] + check_res = subprocess.run(cmd, capture_output=True, text=True) + + if check_res.returncode == 0 and check_res.stdout.strip(): + await _send_status("installed", f"WebDriverAgent found on {name} ({uuid}).") + return True + else: + await _send_status("not installed", f"WebDriverAgent not found on {name} ({uuid}).") + return False + +async def _build_and_install_webdriver(webdriver_path: Path) -> bool: + try: + sim_info = await _get_best_simulator() + if not sim_info: return False + name, uuid = sim_info + + # Ensure booted (check_status might have done this, but good to double check) + if not await _boot_simulator_if_needed(uuid): + await _send_status("error", "Failed to boot simulator.") + return False + + await _send_status("installing", f"Building WebDriverAgent for {name}...") + + # Create a temp directory to store build artifacts + with tempfile.TemporaryDirectory() as derived_data_path: + cmd = [ + "xcodebuild", + "-project", "WebDriverAgent.xcodeproj", + "-scheme", "WebDriverAgentRunner", + "-destination", f"platform=iOS Simulator,id={uuid}", + "-derivedDataPath", derived_data_path, + "-allowProvisioningUpdates", + "build-for-testing" + ] + + result = subprocess.run( + cmd, cwd=str(webdriver_path), + capture_output=True, text=True, timeout=1200 + ) + + if result.returncode != 0: + err = result.stderr[-500:] if result.stderr else "Unknown error" + await _send_status("error", f"Build failed: {err}") + return False + + # Explicitly Install the Built App + await _send_status("installing", "Installing WebDriverAgentRunner-Runner.app...") + + app_path = Path(derived_data_path) / "Build" / "Products" / "Debug-iphonesimulator" / "WebDriverAgentRunner-Runner.app" + + if not app_path.exists(): + await _send_status("error", f"Could not find built app at {app_path}") + return False + + install_res = subprocess.run( + ["xcrun", "simctl", "install", uuid, str(app_path)], + capture_output=True, text=True + ) + + if install_res.returncode != 0: + await _send_status("error", f"Install failed: {install_res.stderr.strip()}") + return False + + await _send_status("installing", "App installed successfully via simctl.") + return True + + except Exception as e: + await _send_status("error", f"Build/Install exception: {e}") + return False + +async def _clone_repository(webdriver_path: Path) -> bool: + try: + if webdriver_path.exists(): + shutil.rmtree(webdriver_path) + webdriver_path.parent.mkdir(parents=True, exist_ok=True) + + await _send_status("installing", "Cloning WebDriverAgent...") + subprocess.run( + ["git", "clone", "--depth", "1", "https://github.com/appium/WebDriverAgent.git", str(webdriver_path)], + check=True, capture_output=True + ) + return True + except Exception as e: + await _send_status("error", f"Clone failed: {e}") + return False + +async def _bootstrap_webdriver(webdriver_path: Path): + script = webdriver_path / "Scripts" / "bootstrap.sh" + if script.exists(): + try: + await _send_status("installing", "Running bootstrap...") + subprocess.run(["bash", str(script)], cwd=str(webdriver_path), capture_output=True) + except: pass + +async def install() -> bool: + print("[webdriver] Starting installation...") + + if await check_status(): + return True + + if not await _check_xcode_installed(): + await _send_status("error", "Xcode required.") + return False + + webdriver_path = _get_webdriver_path() + + if not await _clone_repository(webdriver_path): return False + await _bootstrap_webdriver(webdriver_path) + + if not await _build_and_install_webdriver(webdriver_path): return False + + if await check_status(): + await _send_status("installed", "WebDriverAgent installed successfully.") + return True + else: + await _send_status("error", "Installation completed but verification failed.") + return False \ No newline at end of file diff --git a/Framework/install_handler/ios/xcode.py b/Framework/install_handler/ios/xcode.py index 96b9ac388..08ebddee1 100644 --- a/Framework/install_handler/ios/xcode.py +++ b/Framework/install_handler/ios/xcode.py @@ -1,7 +1,15 @@ -async def check_status(): +from Framework.install_handler.macos.common import xcode_check_status, xcode_install + + +async def check_status() -> bool: + """Check if Xcode is installed and license is accepted.""" print("[xcode] Checking status...") + return await xcode_check_status("iOS") + -async def install(): +async def install(user_password: str = "") -> bool: + """Install Xcode via App Store and accept license.""" print("[xcode] Installing...") + return await xcode_install("iOS", user_password) diff --git a/Framework/install_handler/linux/atspi.py b/Framework/install_handler/linux/atspi.py new file mode 100644 index 000000000..9df1493d8 --- /dev/null +++ b/Framework/install_handler/linux/atspi.py @@ -0,0 +1,189 @@ +import os +from Framework.install_handler.utils import send_response +from .linux_utils import ( + detect_package_manager, + check_all_packages_installed, + install_packages, +) + + +# Package definitions for different package managers +PACKAGES = { + "apt": [ + "build-essential", + "cmake", + "pkg-config", + "libgirepository1.0-dev", + "libcairo2-dev", + "xdotool", + ], + "dnf": [ + "cmake", + "pkgconf-pkg-config", + "gobject-introspection-devel", + "cairo-devel", + "python3-devel", + "cairo-gobject-devel", + "xdotool", + ], + "pacman": [ + "gcc", + "meson", + "cmake", + "pkgconf", + "cairo", + "xdotool", + "gobject-introspection", + ], +} + + +async def check_status(): + """Checks if AT-SPI development packages are installed.""" + print("Checking AT-SPI development packages status...") + + # Check if session type is X11 + session_type = os.environ.get("XDG_SESSION_TYPE", "unknown") + if session_type != "x11": + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "AT-SPI Packages", + "status": "error", + "comment": f"Only X11 is supported. Current session type: {session_type}.", + }, + } + ) + return False + + package_manager, _ = detect_package_manager() + + if not package_manager: + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "AT-SPI Packages", + "status": "error", + "comment": "Unsupported package manager. Only apt, dnf, and pacman are supported.", + }, + } + ) + return False + + packages = PACKAGES.get(package_manager, []) + if check_all_packages_installed(package_manager, packages): + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "AT-SPI Packages", + "status": "installed", + "comment": "AT-SPI development packages are installed.", + }, + } + ) + return True + else: + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "AT-SPI Packages", + "status": "not installed", + "comment": f"Install AT-SPI packages using {package_manager}.", + }, + } + ) + return False + + +async def install(user_password: str = ""): + """Install AT-SPI development packages using the system package manager.""" + print("Installing AT-SPI development packages...") + + # Check if session type is X11 + session_type = os.environ.get("XDG_SESSION_TYPE", "unknown") + if session_type != "x11": + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "AT-SPI Packages", + "status": "error", + "comment": f"Only X11 is supported. Current session type: {session_type}.", + }, + } + ) + return False + + is_already_installed = await check_status() + + if not is_already_installed: + package_manager, _ = detect_package_manager() + + if not package_manager: + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "AT-SPI Packages", + "status": "error", + "comment": "Unsupported package manager. Only apt, dnf, and pacman are supported.", + }, + } + ) + return False + + packages = PACKAGES.get(package_manager, []) + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "AT-SPI Packages", + "status": "installing", + "comment": f"Installing packages using {package_manager}, please wait...", + }, + } + ) + + success, error_msg = install_packages( + package_manager, packages, user_password, timeout=3600 + ) + + if success: + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "AT-SPI Packages", + "status": "installed", + "comment": "AT-SPI packages have been installed successfully.", + }, + } + ) + return True + else: + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "AT-SPI Packages", + "status": "error", + "comment": f"Installation failed. Error: {error_msg}", + }, + } + ) + return False + else: + return True diff --git a/Framework/install_handler/linux/linux_utils.py b/Framework/install_handler/linux/linux_utils.py new file mode 100644 index 000000000..603e36a17 --- /dev/null +++ b/Framework/install_handler/linux/linux_utils.py @@ -0,0 +1,160 @@ +"""Shared utilities for Linux package management.""" + +import subprocess +import shutil + + +def detect_package_manager(): + """Detect the available package manager on the system. + + Returns: + tuple: (package_manager_name, None) where package_manager_name is one of "apt", "dnf", "pacman", or None + """ + if shutil.which("apt-get"): + return "apt", None + elif shutil.which("dnf"): + return "dnf", None + elif shutil.which("pacman"): + return "pacman", None + else: + return None, None + + +def check_package_installed(package_manager, package): + """Check if a package is installed using the appropriate package manager. + + Args: + package_manager: One of "apt", "dnf", or "pacman" + package: Package name to check + + Returns: + bool: True if package is installed, False otherwise + """ + try: + if package_manager == "apt": + result = subprocess.run( + ["dpkg", "-s", package], capture_output=True, text=True, timeout=30 + ) + return result.returncode == 0 + elif package_manager == "dnf": + result = subprocess.run( + ["rpm", "-q", package], capture_output=True, text=True, timeout=30 + ) + return result.returncode == 0 + elif package_manager == "pacman": + result = subprocess.run( + ["pacman", "-Q", package], capture_output=True, text=True, timeout=30 + ) + return result.returncode == 0 + except Exception: + pass + return False + + +def check_all_packages_installed(package_manager, packages): + """Check if all packages in the list are installed. + + Args: + package_manager: One of "apt", "dnf", or "pacman" + packages: List of package names to check + + Returns: + bool: True if all packages are installed, False otherwise + """ + for package in packages: + if not check_package_installed(package_manager, package): + return False + return True + + +def install_packages(package_manager, packages, user_password="", timeout=3600): + """Install packages using the system package manager. + + Args: + package_manager: One of "apt", "dnf", or "pacman" + packages: List of package names to install + user_password: Optional sudo password for authentication + timeout: Timeout in seconds for the installation (default: 3600 = 1 hour) + + Returns: + tuple: (success: bool, error_message: str) + """ + try: + # Build the install command based on package manager + if package_manager == "apt": + if user_password: + cmd = ["sudo", "-S", "apt-get", "install", "-y"] + packages + process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + _, stderr = process.communicate( + input=f"{user_password}\n", timeout=timeout + ) + success = process.returncode == 0 + else: + cmd = ["sudo", "apt-get", "install", "-y"] + packages + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=timeout + ) + success = result.returncode == 0 + stderr = result.stderr + + elif package_manager == "dnf": + if user_password: + cmd = ["sudo", "-S", "dnf", "install", "-y"] + packages + process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + _, stderr = process.communicate( + input=f"{user_password}\n", timeout=timeout + ) + success = process.returncode == 0 + else: + cmd = ["sudo", "dnf", "install", "-y"] + packages + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=timeout + ) + success = result.returncode == 0 + stderr = result.stderr + + elif package_manager == "pacman": + if user_password: + cmd = ["sudo", "-S", "pacman", "-S", "--noconfirm"] + packages + process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + _, stderr = process.communicate( + input=f"{user_password}\n", timeout=timeout + ) + success = process.returncode == 0 + else: + cmd = ["sudo", "pacman", "-S", "--noconfirm"] + packages + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=timeout + ) + success = result.returncode == 0 + stderr = result.stderr + else: + return False, "Unknown package manager" + + if success: + return True, "" + else: + return False, stderr + + except subprocess.TimeoutExpired: + return False, "Installation timed out" + except Exception as e: + return False, str(e) diff --git a/Framework/install_handler/linux/xwd.py b/Framework/install_handler/linux/xwd.py new file mode 100644 index 000000000..561a5e20e --- /dev/null +++ b/Framework/install_handler/linux/xwd.py @@ -0,0 +1,175 @@ +import os +from Framework.install_handler.utils import send_response +from .linux_utils import ( + detect_package_manager, + check_all_packages_installed, + install_packages, +) + + +# Package definitions for different package managers +PACKAGES = { + "apt": [ + "x11-apps", # provides xwd + "imagemagick", # provides convert, import + "wmctrl", + ], + "dnf": [ + "xorg-x11-utils", # provides xwd on some distros + "ImageMagick", + "wmctrl", + ], + "pacman": [ + "imagemagick", + "wmctrl", + ], +} + + +async def check_status(): + """Checks if Screen Capture Utilities are installed.""" + + # Check if session type is X11 + session_type = os.environ.get("XDG_SESSION_TYPE", "unknown") + if session_type != "x11": + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "Screen Capture Utilities", + "status": "error", + "comment": f"Only X11 is supported. Current session type: {session_type}.", + }, + } + ) + return False + + package_manager, _ = detect_package_manager() + + if not package_manager: + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "Screen Capture Utilities", + "status": "error", + "comment": "Unsupported package manager. Only apt, dnf, and pacman are supported.", + }, + } + ) + return False + + packages = PACKAGES.get(package_manager, []) + if check_all_packages_installed(package_manager, packages): + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "Screen Capture Utilities", + "status": "installed", + "comment": "Screen Capture Utilities are installed.", + }, + } + ) + return True + else: + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "Screen Capture Utilities", + "status": "not installed", + "comment": f"Install Screen Capture Utilities using {package_manager}.", + }, + } + ) + return False + + +async def install(user_password: str = ""): + """Install Screen Capture Utilities using the system package manager.""" + + # Check if session type is X11 + session_type = os.environ.get("XDG_SESSION_TYPE", "unknown") + if session_type != "x11": + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "Screen Capture Utilities", + "status": "error", + "comment": f"Only X11 is supported. Current session type: {session_type}.", + }, + } + ) + return False + + is_already_installed = await check_status() + + if not is_already_installed: + package_manager, _ = detect_package_manager() + + if not package_manager: + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "Screen Capture Utilities", + "status": "error", + "comment": "Unsupported package manager. Only apt, dnf, and pacman are supported.", + }, + } + ) + return False + + packages = PACKAGES.get(package_manager, []) + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "Screen Capture Utilities", + "status": "installing", + "comment": f"Installing packages using {package_manager}, please wait...", + }, + } + ) + + success, error_msg = install_packages( + package_manager, packages, user_password, timeout=300 + ) + + if success: + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "Screen Capture Utilities", + "status": "installed", + "comment": "Screen Capture Utilities have been installed successfully.", + }, + } + ) + return True + else: + await send_response( + { + "action": "status", + "data": { + "category": "Linux", + "name": "Screen Capture Utilities", + "status": "error", + "comment": f"Installation failed. Error: {error_msg}", + }, + } + ) + return False + else: + return True diff --git a/Framework/install_handler/long_poll_handler.py b/Framework/install_handler/long_poll_handler.py index 8eb5d02eb..d136826b8 100644 --- a/Framework/install_handler/long_poll_handler.py +++ b/Framework/install_handler/long_poll_handler.py @@ -3,13 +3,13 @@ import random import httpx import inspect -import platform from colorama import Fore from Framework.install_handler.route import Response, services from Framework.install_handler.utils import debug, send_response, read_node_id, generate_services_list from Framework.Utilities import RequestFormatter, ConfigModule from Framework.node_server_state import STATE from Framework.install_handler.android.emulator import create_avd_from_system_image, get_filtered_avd_services, get_available_avds, launch_avd +from Framework.install_handler.ios.simulator import create_simulator_from_device_type, get_filtered_simulator_services, launch_simulator from Framework.install_handler.system_info.system_info import get_formatted_system_info if debug: @@ -31,10 +31,17 @@ async def on_message(self, message: Response) -> None: if action == "services_list": services_list = generate_services_list(services) + # Add Android AVD list avd_list = await get_filtered_avd_services() if avd_list: services_list.insert(1, avd_list) + # Add iOS Simulator list (insert after AVD list if present, or at index 1) + simulator_list = await get_filtered_simulator_services() + if simulator_list: + insert_index = 2 if avd_list else 1 + services_list.insert(insert_index, simulator_list) + await send_response({ "action": "services_list", "data": { @@ -103,6 +110,42 @@ async def on_message(self, message: Response) -> None: return return + # Handle iOSSimulator category + if category["category"] == "iOSSimulator": + print(f"[installer] iOSSimulator category: {category}") + print(f"[installer] iOSSimulator services: {category['services']}") + print(f"[installer] Requested service name: {message.value.item.name}") + service_name = message.value.item.name + + # Case 1: No service name or empty - get device types list + if not service_name: + if action == "install" and "install_function" in category and category["install_function"]: + func = category["install_function"] + await func() + return + else: + print(f"[installer] No install_function found for iOSSimulator category") + return + + # Case 2: Service name is a device type (starts with "install device;") + if service_name.startswith("install device;"): + if action == "install": + await create_simulator_from_device_type(service_name) + return + else: + print(f"[installer] Status check not supported for device types") + return + + # Case 3: This is a request to launch an existing simulator (UDID format) + else: + try: + await launch_simulator(service_name) + except Exception as e: + print(f"[installer] Error launching simulator '{service_name}': {e}") + traceback.print_exc() + return + return + # Normal service-level install for other categories service = [i for i in category["services"] if i["name"] == message.value.item.name][0] if action == "install": diff --git a/Framework/install_handler/macos/common.py b/Framework/install_handler/macos/common.py new file mode 100644 index 000000000..29e593d29 --- /dev/null +++ b/Framework/install_handler/macos/common.py @@ -0,0 +1,171 @@ +import asyncio +import platform +import os +import shutil +import subprocess +from Framework.install_handler.utils import send_response + + +async def _send_status(category, status: str, comment: str): + """Helper to send status responses.""" + await send_response( + { + "action": "status", + "data": { + "category": category, + "name": "Xcode", + "status": status, + "comment": comment, + }, + } + ) + + +async def xcode_check_status(category) -> bool: + """Check if Xcode is installed and license is accepted.""" + print("[xcode] Checking status...") + + if platform.system().lower() != "darwin": + await _send_status( + category, "error", "Unsupported OS. Xcode is only available on macOS." + ) + return False + + xcodebuild_path = shutil.which("xcodebuild") + if not xcodebuild_path or not os.path.exists("/Applications/Xcode.app"): + await _send_status(category, "not installed", "Xcode is not installed.") + return False + + try: + # Check version + result = subprocess.run( + [xcodebuild_path, "-version"], capture_output=True, text=True, timeout=30 + ) + if result.returncode != 0: + await _send_status( + category, "error", f"xcodebuild error: {result.stderr.strip()}" + ) + return False + + version = result.stdout.splitlines()[0].strip() if result.stdout else "unknown" + + # Check license + license_result = subprocess.run( + [xcodebuild_path, "-checkFirstLaunchStatus"], + capture_output=True, + text=True, + timeout=15, + ) + + if license_result.returncode == 0: + await _send_status(category, "installed", f"{version}") + return True + else: + await _send_status( + category, + "not installed", + f"{version} installed but license not accepted.", + ) + return False + + except subprocess.TimeoutExpired: + await _send_status(category, "error", "xcodebuild timed out.") + return False + except Exception as e: + await _send_status(category, "error", f"Error checking Xcode: {e}") + return False + + +async def _accept_license(category, user_password: str) -> bool: + """Attempt to accept Xcode license using sudo.""" + xcodebuild_path = shutil.which("xcodebuild") or "xcodebuild" + + if not user_password: + await _send_status( + category, "error", "Password required to accept Xcode license." + ) + return False + + try: + result = subprocess.run( + ["sudo", "-S", xcodebuild_path, "-license", "accept"], + input=user_password + "\n", + capture_output=True, + text=True, + timeout=60, + ) + + if result.returncode == 0: + await _send_status( + category, "installed", "Xcode license accepted successfully." + ) + return True + else: + await _send_status( + category, "error", f"Failed to accept license: {result.stderr.strip()}" + ) + return False + except Exception as e: + await _send_status(category, "error", f"Error accepting license: {e}") + return False + + +async def xcode_install(category, user_password: str = "") -> bool: + """Install Xcode via App Store and accept license.""" + print("[xcode] Installing...") + + if platform.system().lower() != "darwin": + await _send_status( + category, "error", "Unsupported OS. Xcode is only available on macOS." + ) + return False + + if await xcode_check_status(category): + return True + + # Open App Store + await _send_status( + category, "installing", "Opening App Store. Please install Xcode and wait..." + ) + try: + subprocess.run( + ["open", "itms-apps://itunes.apple.com/app/id497799835"], timeout=10 + ) + except Exception: + pass + + # Poll for installation (2 hours timeout) + timeout_seconds = 2 * 60 * 60 + interval = 10 + elapsed = 0 + + while elapsed < timeout_seconds: + await asyncio.sleep(interval) + elapsed += interval + + # Check if Xcode app exists + xcodebuild_path = shutil.which("xcodebuild") + if os.path.exists("/Applications/Xcode.app") and xcodebuild_path: + # Check license status + try: + license_result = subprocess.run( + [xcodebuild_path, "-checkFirstLaunchStatus"], + capture_output=True, + text=True, + timeout=15, + ) + + if license_result.returncode == 0: + await _send_status( + category, "installed", "Xcode installed successfully." + ) + return True + else: + # License not accepted, attempt to accept + return await _accept_license(category, user_password) + except Exception: + # Try to accept license anyway + return await _accept_license(category, user_password) + + await _send_status(category, "error", "Timed out waiting for Xcode installation.") + return False diff --git a/Framework/install_handler/macos/xcode.py b/Framework/install_handler/macos/xcode.py new file mode 100644 index 000000000..a940e0834 --- /dev/null +++ b/Framework/install_handler/macos/xcode.py @@ -0,0 +1,16 @@ +from .common import xcode_check_status, xcode_install + + +async def check_status() -> bool: + """Check if Xcode is installed and license is accepted.""" + print("[xcode] Checking status...") + + return await xcode_check_status("MacOS") + + + +async def install(user_password: str = "") -> bool: + """Install Xcode via App Store and accept license.""" + print("[xcode] Installing...") + + return await xcode_install("MacOS", user_password) diff --git a/Framework/install_handler/route.py b/Framework/install_handler/route.py index 88943dc60..56496af38 100644 --- a/Framework/install_handler/route.py +++ b/Framework/install_handler/route.py @@ -1,22 +1,25 @@ from pydantic import BaseModel, ConfigDict from typing import Literal, Optional -import platform from .web import chrome_for_testing, edge, mozilla -from .android import adb, node_js_22, appium, java, android_emulator, android_sdk, jdk, emulator -from .ios import xcode +from .android import ( + adb, + node_js_22, + appium, + java, + android_sdk, + jdk, +) +from .ios import xcode, simulator +from .macos import xcode as macos_xcode from .database import postgresql, mysql, mariadb, oracle from .windows import inspector from .android.emulator import android_emulator_install - -import httpx -from Framework.Utilities import RequestFormatter, ConfigModule, CommonUtil -import datetime -from Framework.install_handler.utils import debug +from .ios.simulator import ios_simulator_install services = [ { - "group":{ + "group": { "check_text": "check all", "install_text": "install all", }, @@ -29,7 +32,7 @@ "install_text": "install", "os": ["windows", "linux", "darwin"], "status_function": node_js_22.check_status, - "install_function": node_js_22.check_status, #on purpose. Node 22 is installed when node starts. + "install_function": node_js_22.check_status, # on purpose. Node 22 is installed when node starts. "user_password": "no", }, { @@ -39,7 +42,7 @@ "install_text": "install", "os": ["windows", "linux", "darwin"], "status_function": appium.check_status, - "install_function": appium.check_status, #on purpose. Appium is installed when node starts. + "install_function": appium.check_status, # on purpose. Appium is installed when node starts. "user_password": "no", }, { @@ -49,8 +52,8 @@ "install_text": "install", "os": ["windows", "linux", "darwin"], "status_function": java.check_status, - "install_function": java.install, #install jdk here also. jdk.install will install java also. - "user_password": "no" + "install_function": java.install, # install jdk here also. jdk.install will install java also. + "user_password": "no", }, { "name": "JDK", @@ -60,7 +63,7 @@ "os": ["windows", "linux", "darwin"], "status_function": jdk.check_status, "install_function": jdk.install, - "user_password": "no" + "user_password": "no", }, { "name": "Android SDK", @@ -70,7 +73,7 @@ "os": ["windows", "linux", "darwin"], "status_function": android_sdk.check_status, "install_function": android_sdk.install, - "user_password": "no" + "user_password": "no", }, { "name": "ADB", @@ -82,23 +85,35 @@ "install_function": adb.install, "user_password": "no", }, - ] + ], }, { - "group":{ + "group": { "check_text": "", "install_text": "", }, "category": "AndroidEmulator", - "name" : "System Images", + "name": "System Images", "install_text": "install", "install_function": android_emulator_install, "installables": [], "services": [], }, + { + "group": { + "check_text": "", + "install_text": "", + }, + "category": "iOSSimulator", + "name": "Device Types", + "install_text": "install", + "install_function": ios_simulator_install, + "installables": [], + "services": [], + }, { "category": "Web", - "group":{ + "group": { "check_text": "check all", "install_text": "install all", }, @@ -132,15 +147,14 @@ "status_function": edge.check_status, "install_function": edge.install, "user_password": "yes", - } - - ] + }, + ], }, { "category": "iOS", - "group":{ - "check_text": "", - "install_text": "", + "group": { + "check_text": "check all", + "install_text": "install all", }, "services": [ { @@ -151,13 +165,42 @@ "os": ["darwin"], "status_function": xcode.check_status, "install_function": xcode.install, - "user_password": "no" + "user_password": "yes", + }, + { + "name": "Simulator", + "status": "none", + "comment": "Simulator is a tool for managing Simulator devices.", + "install_text": "install", + "os": ["darwin"], + "status_function": simulator.check_status, + "install_function": simulator.install, + "user_password": "yes", + }, + ], + }, + { + "category": "MacOS", + "group": { + "check_text": "", + "install_text": "", + }, + "services": [ + { + "name": "Xcode", + "status": "none", + "comment": "Xcode is a tool for managing Xcode devices.", + "install_text": "install", + "os": ["darwin"], + "status_function": macos_xcode.check_status, + "install_function": macos_xcode.install, + "user_password": "yes", } - ] + ], }, { "category": "Database", - "group":{ + "group": { "check_text": "check all", "install_text": "install all", }, @@ -170,7 +213,7 @@ "os": ["windows", "linux", "darwin"], "status_function": postgresql.check_status, "install_function": postgresql.install, - "user_password": "no" + "user_password": "no", }, { "name": "MySQL", @@ -180,7 +223,7 @@ "os": ["windows", "linux", "darwin"], "status_function": mysql.check_status, "install_function": mysql.install, - "user_password": "no" + "user_password": "no", }, { "name": "MariaDB", @@ -190,7 +233,7 @@ "os": ["windows", "linux", "darwin"], "status_function": mariadb.check_status, "install_function": mariadb.install, - "user_password": "yes" + "user_password": "yes", }, { "name": "Oracle", @@ -200,13 +243,13 @@ "os": ["windows", "linux", "darwin"], "status_function": oracle.check_status, "install_function": oracle.install, - "user_password": "no" - } - ] + "user_password": "no", + }, + ], }, { "category": "Windows", - "group":{ + "group": { "check_text": "", "install_text": "", }, @@ -220,28 +263,36 @@ "os": ["windows"], "status_function": inspector.check_status, "install_function": inspector.install, - "user_password": "no" + "user_password": "no", } - ] - } + ], + }, ] class Item(BaseModel): - name: Optional[str] = None - category: str - user_password: str = "" # Optional user password for installations requiring sudo/admin + name: Optional[str] = None + category: str + user_password: str = ( + "" # Optional user password for installations requiring sudo/admin + ) class Value(BaseModel): - model_config = ConfigDict(extra='forbid') - - action: Literal["services_list", "install", "status", "system_info", "group_status", "group_install"] - item: Item | None = None + model_config = ConfigDict(extra="forbid") + + action: Literal[ + "services_list", + "install", + "status", + "system_info", + "group_status", + "group_install", + ] + item: Item | None = None class Response(BaseModel): - model_config = ConfigDict(extra='forbid') - - value: Value | None + model_config = ConfigDict(extra="forbid") + value: Value | None diff --git a/server/mobile.py b/server/mobile.py index 55dbc10ed..2f04967c7 100644 --- a/server/mobile.py +++ b/server/mobile.py @@ -2,7 +2,6 @@ import os import subprocess import base64 -import time from typing import Literal import asyncio @@ -32,6 +31,7 @@ class DeviceInfo(BaseModel): """Model for device information.""" serial: str status: str + name: str | None = None # model: str | None = None # product: str | None = None @@ -42,33 +42,34 @@ def get_devices(): # Get list of devices devices_output = run_adb_command(f"{ADB_PATH} devices -l") devices = [] - + # Parse adb devices output - for line in devices_output.split('\n')[1:]: # Skip first line (header) + index = 1 + for line in devices_output.split("\n")[1:]: # Skip first line (header) if line.strip(): parts = line.split() if len(parts) >= 2: serial = parts[0] status = parts[1] + name = f"device {index}" + index += 1 # model = run_adb_command(f"{ADB_PATH} -s {serial} shell getprop ro.product.model") # product = run_adb_command(f"{ADB_PATH} -s {serial} shell getprop ro.product.name") - devices.append(DeviceInfo( - serial=serial, - status=status - )) - + devices.append(DeviceInfo(serial=serial, status=status, name=name)) + return devices except Exception as e: return [] + @router.get("/inspect") -def inspect(): +def inspect(device_serial: str | None = None): """Get the Mobile DOM and screenshot.""" try: # Capture UI and screenshot - capture_ui_dump() - capture_screenshot() + capture_ui_dump(device_serial=device_serial) + capture_screenshot(device_serial=device_serial) # Read XML file with open(UI_XML_PATH, 'r') as xml_file: @@ -108,9 +109,12 @@ def run_adb_command(command): return f"Error: {e.stderr.strip()}" -def capture_ui_dump(): +def capture_ui_dump(device_serial: str | None = None): """Capture the current UI hierarchy from the device""" - out = run_adb_command(f"{ADB_PATH} shell uiautomator dump /sdcard/ui.xml") + device_flag = f"-s {device_serial}" if device_serial else "" + out = run_adb_command( + f"{ADB_PATH} {device_flag} shell uiautomator dump /sdcard/ui.xml".strip() + ) if out.startswith("Error:"): from Framework.Built_In_Automation.Mobile.CrossPlatform.Appium.BuiltInFunctions import appium_driver if appium_driver is None: @@ -119,14 +123,19 @@ def capture_ui_dump(): with open(UI_XML_PATH, 'w') as xml_file: xml_file.write(page_src) else: - out = run_adb_command(f"{ADB_PATH} pull /sdcard/ui.xml {UI_XML_PATH}") + out = run_adb_command( + f"{ADB_PATH} {device_flag} pull /sdcard/ui.xml {UI_XML_PATH}" + ) if out.startswith("Error:"): return -def capture_screenshot(): +def capture_screenshot(device_serial: str | None = None): """Capture the current UI hierarchy from the device""" - out = run_adb_command(f"{ADB_PATH} shell screencap -p /sdcard/screen.png") + device_flag = f"-s {device_serial}" if device_serial else "" + out = run_adb_command( + f"{ADB_PATH} {device_flag} shell screencap -p /sdcard/screen.png".strip() + ) if out.startswith("Error:"): from Framework.Built_In_Automation.Mobile.CrossPlatform.Appium.BuiltInFunctions import appium_driver if appium_driver is None: @@ -134,7 +143,9 @@ def capture_screenshot(): full_screenshot_path = os.path.join(os.getcwd(), SCREENSHOT_PATH) appium_driver.save_screenshot(full_screenshot_path) else: - out = run_adb_command(f"{ADB_PATH} pull /sdcard/screen.png {SCREENSHOT_PATH}") + out = run_adb_command( + f"{ADB_PATH} {device_flag} pull /sdcard/screen.png {SCREENSHOT_PATH}" + ) if out.startswith("Error:"): return