From 492d9a6b914ebc5be437aff6f34cb0181898f9c1 Mon Sep 17 00:00:00 2001 From: Stephen Shao Date: Wed, 4 Mar 2026 15:55:01 -0600 Subject: [PATCH] feat(k8s): align K8s perf reporting with Docker (multi_result + perf_super) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit K8s orchestrator now uses the same reporting path as single-node Docker when multiple_results CSV is present, so both produce the same artifacts: perf.csv, perf_entry.json, perf_entry.csv, perf_super.json, perf_entry_super.json/csv, and perf_super.csv. - Add Docker-compatible reporting path in _collect_results: - _build_common_info_dict() for common_info (args, gpu_arch, etc.) - _ensure_perf_csv_exists() so update_perf_csv can read perf.csv - Call update_perf_csv, update_perf_super_json, update_perf_super_csv with scripts_base_dir so --config from models.json resolves correctly - Multi-node multiple_results: resolve one CSV path per run - _resolve_multiple_results_csv(): single pod → use that CSV; multi-pod → merge all pod CSVs with sum/average rules - _merge_multi_node_multiple_results_csv(): align rows by index; performance aggregated by metric type (throughput→sum, latency→avg, memory→max); extra columns by _aggregation_for_extra_column (sum/avg/max/first) - _aggregation_for_extra_column() for consistent multi-node semantics - Keep legacy row-by-row _write_to_perf_csv when reporting module unavailable; record failure when no CSV found - job.yaml.j2: no functional change required; existing copy block and find fallback for multiple_results already support this refactor --- src/madengine/deployment/kubernetes.py | 529 +++++++++++++++--- .../templates/kubernetes/job.yaml.j2 | 44 +- 2 files changed, 475 insertions(+), 98 deletions(-) diff --git a/src/madengine/deployment/kubernetes.py b/src/madengine/deployment/kubernetes.py index 82e477bb..f3ed2223 100644 --- a/src/madengine/deployment/kubernetes.py +++ b/src/madengine/deployment/kubernetes.py @@ -42,6 +42,13 @@ from madengine.core.errors import ConfigurationError, create_error_context from madengine.utils.gpu_config import resolve_runtime_gpus +try: + from madengine.reporting.update_perf_csv import update_perf_csv + from madengine.reporting.update_perf_super import update_perf_super_json, update_perf_super_csv + REPORTING_AVAILABLE = True +except ImportError: + REPORTING_AVAILABLE = False + # Valid distributed launchers VALID_LAUNCHERS = [ @@ -708,40 +715,29 @@ def _prepare_template_context( model_script_dir = str(script_file.parent) # e.g., "scripts/dummy" model_script_filename = script_file.name # e.g., "run_data_minio.sh" - # Load ALL scripts from the model's scripts directory - # This is critical for models that have multiple helper scripts + # Bundle entire scripts/ directory recursively for reliability across + # different model types (vllm, sglang, etc.) with varying file types and subdirs scripts_dir_path = Path(model_script_dir) if scripts_dir_path.exists() and scripts_dir_path.is_dir(): - for script in scripts_dir_path.glob("*.sh"): - with open(script, "r") as f: - # Use the path directly if relative, otherwise convert to relative - if script.is_absolute(): - relative_path = str(script.relative_to(Path.cwd())) - else: - relative_path = str(script) - model_scripts_contents[relative_path] = f.read() - - # Also check for Python scripts - for script in scripts_dir_path.glob("*.py"): - with open(script, "r") as f: - # Use the path directly if relative, otherwise convert to relative - if script.is_absolute(): - relative_path = str(script.relative_to(Path.cwd())) - else: - relative_path = str(script) - model_scripts_contents[relative_path] = f.read() - - # Also check for JSON config files (e.g., DeepSpeed configs) - for script in scripts_dir_path.glob("*.json"): - with open(script, "r") as f: - # Use the path directly if relative, otherwise convert to relative - if script.is_absolute(): - relative_path = str(script.relative_to(Path.cwd())) - else: - relative_path = str(script) - model_scripts_contents[relative_path] = f.read() - - self.console.print(f"[dim]Loaded {len(model_scripts_contents)} script(s) from {model_script_dir}[/dim]") + cwd = Path.cwd() + for f in scripts_dir_path.rglob("*"): + if not f.is_file(): + continue + try: + content = f.read_text(encoding="utf-8", errors="strict") + except (UnicodeDecodeError, OSError): + # Skip binary or unreadable files (ConfigMap is text-only) + self.console.print( + f"[dim]Skipping non-text file: {f.relative_to(scripts_dir_path)}[/dim]" + ) + continue + relative_path = ( + str(f.relative_to(cwd)) if f.is_absolute() else str(f) + ) + model_scripts_contents[relative_path] = content + self.console.print( + f"[dim]Loaded {len(model_scripts_contents)} file(s) from {model_script_dir}[/dim]" + ) elif script_file.exists(): # Fallback: load single file if directory doesn't exist with open(script_file, "r") as f: @@ -938,12 +934,13 @@ def _prepare_template_context( create_headless_service = True self.console.print(f"[dim]Multi-node vLLM: Creating headless service for Ray cluster[/dim]") - # Generate vLLM launcher command + # Generate vLLM launcher command (pass model args so run.sh gets --model_repo etc.) launcher_command = self._generate_vllm_command( nnodes=nnodes, nproc_per_node=nproc_per_node, master_port=master_port, - model_script=model_info.get("scripts", "run.sh") + model_script=model_info.get("scripts", "run.sh"), + model_args=model_info.get("args", ""), ) elif launcher_type == "sglang": @@ -951,12 +948,13 @@ def _prepare_template_context( create_headless_service = True self.console.print(f"[dim]Multi-node SGLang: Creating headless service for Ray cluster[/dim]") - # Generate SGLang launcher command + # Generate SGLang launcher command (pass model args so run.sh gets CLI args) launcher_command = self._generate_sglang_command( nnodes=nnodes, nproc_per_node=nproc_per_node, master_port=master_port, - model_script=model_info.get("scripts", "run.sh") + model_script=model_info.get("scripts", "run.sh"), + model_args=model_info.get("args", ""), ) elif launcher_type == "sglang-disagg" or launcher_type == "sglang_disagg": @@ -1777,7 +1775,12 @@ def _generate_sglang_disagg_command( """ def _generate_vllm_command( - self, nnodes: int, nproc_per_node: int, master_port: int, model_script: str + self, + nnodes: int, + nproc_per_node: int, + master_port: int, + model_script: str, + model_args: str = "", ) -> str: """ Generate vLLM launcher command for K8s Indexed Jobs. @@ -1802,6 +1805,7 @@ def _generate_vllm_command( nproc_per_node: GPUs per node. Must be >= 1. master_port: Master communication port (for Ray). Must be 1-65535. model_script: Path to model's run script. Cannot be empty. + model_args: CLI args for the script (e.g. --model_repo openai/gpt-oss-20b). Returns: Complete vLLM launch setup with environment configuration @@ -1818,6 +1822,11 @@ def _generate_vllm_command( raise ValueError(f"master_port must be 1-65535, got {master_port}") if not model_script or not isinstance(model_script, str): raise ValueError(f"model_script must be non-empty string, got {model_script}") + + # Run script from its directory so relative paths (run_vllm.py, configs/) resolve + script_dir = str(Path(model_script).parent) + script_name = Path(model_script).name + run_cmd = f"cd /workspace/{script_dir} && bash {script_name} {model_args}".strip() # For single-node, simple TP setup (no Ray needed) if nnodes == 1: @@ -1835,8 +1844,8 @@ def _generate_vllm_command( echo " Distributed Backend: auto (no Ray)" echo " Total GPUs: {nproc_per_node}" -# vLLM handles process management - just run the script -{model_script}""" +# vLLM handles process management - run script from its directory so run_vllm.py/configs resolve +{run_cmd}""" # Multi-node: Data Parallelism with independent Ray clusters per pod return f"""# vLLM multi-node setup (K8s Data Parallelism Mode) @@ -1881,14 +1890,19 @@ def _generate_vllm_command( echo "Ray cluster ready:" ray status -# Run vLLM inference script -{model_script} +# Run vLLM inference script from its directory so run_vllm.py/configs resolve +{run_cmd} # Cleanup Ray on exit trap "ray stop --force 2>/dev/null || true" EXIT""" def _generate_sglang_command( - self, nnodes: int, nproc_per_node: int, master_port: int, model_script: str + self, + nnodes: int, + nproc_per_node: int, + master_port: int, + model_script: str, + model_args: str = "", ) -> str: """ Generate SGLang launcher command for K8s Indexed Jobs. @@ -1912,6 +1926,7 @@ def _generate_sglang_command( nproc_per_node: GPUs per node. Must be >= 1. master_port: Master communication port (for NCCL/Ray). Must be 1-65535. model_script: Path to model's run script. Cannot be empty. + model_args: CLI args for the script (e.g. --model_repo ...). Returns: Complete SGLang launch setup with environment configuration @@ -1928,7 +1943,12 @@ def _generate_sglang_command( raise ValueError(f"master_port must be 1-65535, got {master_port}") if not model_script or not isinstance(model_script, str): raise ValueError(f"model_script must be non-empty string, got {model_script}") - + + # Run script from its directory so relative paths resolve; pass model args + script_dir = str(Path(model_script).parent) + script_name = Path(model_script).name + run_cmd = f"cd /workspace/{script_dir} && bash {script_name} {model_args}".strip() + # For single-node, simple TP setup if nnodes == 1: return f"""# SGLang single-node setup (Tensor Parallelism) @@ -1943,8 +1963,8 @@ def _generate_sglang_command( echo " Total GPUs: {nproc_per_node}" # SGLang native launcher handles everything -{model_script}""" - +{run_cmd}""" + # Multi-node: Use SGLang's native multi-node support return f"""# SGLang multi-node setup (K8s Indexed Job) export MASTER_ADDR="{self.job_name}-0.{self.job_name}.{self.namespace}.svc.cluster.local" @@ -1980,7 +2000,7 @@ def _generate_sglang_command( export NCCL_INIT_ADDR="${{MASTER_ADDR}}:${{MASTER_PORT}}" echo "Starting SGLang with native multi-node launcher..." -{model_script} +{run_cmd} # Cleanup Ray on exit trap "ray stop --force 2>/dev/null || true" EXIT""" @@ -2274,13 +2294,27 @@ def _create_results_pvc(self) -> str: storage_class=self.k8s_config.get("storage_class") ) - # Create PVC + # Create PVC (retry on 409 "object is being deleted" until it is gone) pvc_dict = yaml.safe_load(pvc_yaml) - self.core_v1.create_namespaced_persistent_volume_claim( - namespace=self.namespace, body=pvc_dict - ) - - return pvc_name + max_create_retries = 6 + create_wait_seconds = 5 + for attempt in range(max_create_retries): + try: + self.core_v1.create_namespaced_persistent_volume_claim( + namespace=self.namespace, body=pvc_dict + ) + return pvc_name + except ApiException as e: + if e.status == 409 and e.body and "object is being deleted" in (e.body or ""): + if attempt < max_create_retries - 1: + self.console.print( + f"[dim]PVC still terminating, waiting {create_wait_seconds}s before retry ({attempt + 1}/{max_create_retries})[/dim]" + ) + time.sleep(create_wait_seconds) + else: + raise + else: + raise def _create_or_get_data_pvc(self, nnodes: int = 1) -> str: """ @@ -2422,7 +2456,6 @@ def _cleanup_existing_resources(self): ) self.console.print(f"[dim]Deleted existing collector pod: {collector_pod_name}[/dim]") # Wait a moment for pod to release the PVC - import time time.sleep(2) except ApiException as e: if e.status != 404: @@ -2438,8 +2471,7 @@ def _cleanup_existing_resources(self): self.console.print(f"[dim]Deleted existing PVC: {pvc_name}[/dim]") # Wait for PVC to be fully deleted (not just marked for deletion) - import time - max_wait = 30 # Maximum 30 seconds + max_wait = 90 # Maximum 90 seconds (PV can take time to detach) wait_interval = 1 # Check every 1 second for i in range(max_wait): try: @@ -2447,6 +2479,10 @@ def _cleanup_existing_resources(self): name=pvc_name, namespace=self.namespace ) + if i > 0 and i % 10 == 0: + self.console.print( + f"[dim]Waiting for PVC {pvc_name} to be removed... ({i}s)[/dim]" + ) time.sleep(wait_interval) except ApiException as e: if e.status == 404: @@ -2457,7 +2493,6 @@ def _cleanup_existing_resources(self): pass # Wait a moment for other resources to be deleted - import time time.sleep(1) def deploy(self) -> DeploymentResult: @@ -2595,8 +2630,6 @@ def _monitor_status_only(self, deployment_id: str) -> DeploymentResult: def _monitor_with_live_logs(self, deployment_id: str) -> DeploymentResult: """Monitor Job and stream logs in real-time.""" - import time - self.console.print(f"\n[cyan]═══ Streaming pod logs (--live-output) ═══[/cyan]\n") pod_name = None @@ -2951,34 +2984,114 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: f"[green]✓ Updated local perf.csv[/green]" ) else: - # No performance from log: try multiple_results CSV (same contract as local execution) - # Single-node multi-row: write one perf.csv row per CSV row (no multinode aggregation) - fallback_metrics = self._parse_multiple_results_from_artifacts( - results_dir, results, model_info, build_info + # No performance from log: try multiple_results CSV (same contract as local Docker) + # Resolve single CSV path (one pod) or merged CSV path (multi-pod with sum/avg rules) + resolved_csv_path = self._resolve_multiple_results_csv( + results_dir, results, model_info ) - if fallback_metrics: - for item in fallback_metrics: - record = self._create_multiple_result_row_record( - model_info, build_info, deployment_id, item - ) - if record: - self._write_to_perf_csv(record) - # Use nodes=[] so display shows perf_data.performance/metric per row - # (not pod-level nodes whose perf came from log and is empty for multiple_results) - results["successful_runs"].append({ - "model": item["model"], - "perf_data": record, - "nodes": [], - "per_node_metrics": [item], - }) + if resolved_csv_path and REPORTING_AVAILABLE: + # Docker-compatible flow: produce perf.csv, perf_entry.*, perf_super.* + gpu_arch = "N/A" + if results.get("logs"): + import re + log_content = results["logs"][0].get("log", "") + m = re.search(r"(?:🔹\s*)?Name\s*:\s*(gfx\w+)", log_content) + if m: + gpu_arch = m.group(1) + self._ensure_perf_csv_exists() + common_info = self._build_common_info_dict( + model_info, build_info, deployment_id, gpu_arch + ) + common_info_path = Path("common_info.json") + with open(common_info_path, "w", encoding="utf-8") as f: + json.dump(common_info, f, indent=2) + update_perf_csv( + perf_csv="perf.csv", + multiple_results=str(resolved_csv_path), + common_info=str(common_info_path), + model_name=model_info.get("name", ""), + ) + scripts_path = model_info.get("scripts", "") + scripts_base_dir = os.path.dirname(scripts_path) if scripts_path else None + num_entries = update_perf_super_json( + perf_super_json="perf_super.json", + multiple_results=str(resolved_csv_path), + common_info=str(common_info_path), + model_name=model_info.get("name", ""), + scripts_base_dir=scripts_base_dir, + ) + update_perf_super_csv( + perf_super_json="perf_super.json", + perf_super_csv="perf_super.csv", + num_entries=num_entries, + ) + # Build successful_runs for display (one entry per CSV row) + import csv as _csv + model_name = model_info.get("name", "") + with open(resolved_csv_path, "r", encoding="utf-8", errors="ignore") as f: + reader = _csv.DictReader(f) + for row in reader: + row = {k.strip(): v for k, v in row.items() if k} + if row.get("performance") and row.get("metric"): + display_model = f"{model_name}_{row.get('model', '')}" + record = self._create_multiple_result_row_record( + model_info, build_info, deployment_id, + { + "model": display_model, + "performance": row.get("performance"), + "metric": row.get("metric", ""), + "gpu_architecture": gpu_arch, + "duration": row.get("test_duration", "N/A"), + }, + ) + if record: + results["successful_runs"].append({ + "model": display_model, + "perf_data": record, + "nodes": [], + "per_node_metrics": [{"model": display_model, "performance": row.get("performance"), "metric": row.get("metric", "")}], + }) self.console.print( - f"[green]✓ Wrote {len(fallback_metrics)} row(s) from multiple_results to perf.csv[/green]" + f"[green]✓ Updated perf.csv, perf_entry.*, perf_super.* (Docker-compatible)[/green]" + ) + elif resolved_csv_path and not REPORTING_AVAILABLE: + # Fallback when reporting module not available: legacy row-by-row write + fallback_metrics = self._parse_multiple_results_from_artifacts( + results_dir, results, model_info, build_info + ) + if fallback_metrics: + for item in fallback_metrics: + record = self._create_multiple_result_row_record( + model_info, build_info, deployment_id, item + ) + if record: + self._write_to_perf_csv(record) + results["successful_runs"].append({ + "model": item["model"], + "perf_data": record, + "nodes": [], + "per_node_metrics": [item], + }) + self.console.print( + f"[green]✓ Wrote {len(fallback_metrics)} row(s) from multiple_results to perf.csv[/green]" + ) + if not resolved_csv_path: + # No multiple_results CSV found: record failure + error_msg = "No performance metrics found from any node" + failure_record = self._create_failure_record( + model_info, build_info, deployment_id, error_msg ) + self._write_to_perf_csv(failure_record) + results["failed_runs"].append({ + "model": model_info.get("name", "Unknown"), + "error": error_msg, + "nodes": results["nodes"] + }) self.console.print( - f"[green]✓ Updated local perf.csv[/green]" + f"[yellow]⚠ No performance metrics found, recorded as FAILED[/yellow]" ) - if not fallback_metrics: - # No log metric and no multiple_results CSV: record failure + elif resolved_csv_path and not REPORTING_AVAILABLE and not results.get("successful_runs"): + # Legacy path ran but produced no valid rows error_msg = "No performance metrics found from any node" failure_record = self._create_failure_record( model_info, build_info, deployment_id, error_msg @@ -3410,7 +3523,75 @@ def _create_failure_record(self, model_info: Dict, build_info: Dict, pod_name: s result["tags"] = ",".join(str(item) for item in result["tags"]) return result - + + # Standard perf.csv header (must match container_runner.ensure_perf_csv_exists) + _PERF_CSV_HEADER = ( + "model,n_gpus,nnodes,gpus_per_node,training_precision,pipeline,args,tags," + "docker_file,base_docker,docker_sha,docker_image,git_commit,machine_name," + "deployment_type,launcher,gpu_architecture,performance,metric,relative_change," + "status,build_duration,test_duration,dataname,data_provider_type,data_size," + "data_download_duration,build_number,additional_docker_run_options" + ) + + def _ensure_perf_csv_exists(self) -> None: + """Ensure perf.csv exists with standard header (same as Docker container_runner).""" + perf_csv_path = Path("perf.csv") + if not perf_csv_path.exists(): + perf_csv_path.write_text(self._PERF_CSV_HEADER + "\n", encoding="utf-8") + self.console.print("[dim]Created perf.csv with standard header[/dim]") + + def _build_common_info_dict( + self, + model_info: Dict, + build_info: Dict, + deployment_id: str, + gpu_architecture: str = "", + ) -> Dict: + """ + Build common_info dict for update_perf_csv / update_perf_super (Docker-compatible). + Same shape as container_runner create_run_details_dict; model/performance/metric + are omitted so they are filled from the multiple_results CSV. + """ + deployment_config = self.manifest.get("deployment_config", {}) + distributed_config = deployment_config.get("distributed", {}) + nnodes = distributed_config.get("nnodes", 1) + nproc_per_node = distributed_config.get("nproc_per_node") + if nproc_per_node is None: + nproc_per_node = int(model_info.get("n_gpus", 1)) + total_gpus = nnodes * nproc_per_node + gpus_per_node = str(nproc_per_node) + nnodes_str = str(nnodes) + result = { + "n_gpus": str(total_gpus), + "nnodes": nnodes_str, + "gpus_per_node": gpus_per_node, + "training_precision": model_info.get("training_precision", ""), + "pipeline": os.environ.get("pipeline", ""), + "args": model_info.get("args", ""), + "tags": model_info.get("tags", ""), + "docker_file": build_info.get("dockerfile", ""), + "base_docker": build_info.get("base_docker", ""), + "docker_sha": build_info.get("docker_sha", ""), + "docker_image": build_info.get("docker_image", ""), + "git_commit": "", + "machine_name": deployment_id, + "deployment_type": "kubernetes", + "launcher": "native", + "gpu_architecture": gpu_architecture, + "relative_change": "", + "build_duration": build_info.get("build_duration", ""), + "test_duration": "", + "dataname": model_info.get("data", ""), + "data_provider_type": "", + "data_size": "", + "data_download_duration": "", + "build_number": os.environ.get("BUILD_NUMBER", "0"), + "additional_docker_run_options": model_info.get("additional_docker_run_options", ""), + } + if isinstance(result["tags"], list): + result["tags"] = ",".join(str(t) for t in result["tags"]) + return result + def _create_multiple_result_row_record( self, model_info: Dict, @@ -3552,9 +3733,7 @@ def _parse_multiple_results_from_artifacts( """ import csv as csv_module multiple_results_file = model_info.get("multiple_results") - if not multiple_results_file: - return [] - filename = Path(multiple_results_file).name + filename = Path(multiple_results_file).name if multiple_results_file else None # Try to get gpu_architecture from first pod log gpu_arch = "N/A" if results.get("logs"): @@ -3570,8 +3749,12 @@ def _parse_multiple_results_from_artifacts( local_path = Path(art.get("local_path", "")) if not local_path.is_dir(): continue - csv_path = local_path / filename - if not csv_path.is_file(): + # Prefer exact filename (same as Docker multiple_results); fallback to any perf_*.csv + csv_path = (local_path / filename) if filename else None + if not csv_path or not csv_path.is_file(): + perf_csvs = sorted(local_path.glob("perf_*.csv")) + csv_path = perf_csvs[0] if perf_csvs else None + if not csv_path or not csv_path.is_file(): continue try: with open(csv_path, "r", encoding="utf-8", errors="ignore") as f: @@ -3603,15 +3786,181 @@ def _parse_multiple_results_from_artifacts( }) if parsed_list: self.console.print( - f"[green] ✓ Parsed performance from {filename} ({len(parsed_list)} row(s))[/green]" + f"[green] ✓ Parsed performance from {csv_path.name} ({len(parsed_list)} row(s))[/green]" ) return parsed_list except Exception as e: self.console.print( - f"[dim] Could not parse {filename} from PVC: {e}[/dim]" + f"[dim] Could not parse {csv_path.name} from PVC: {e}[/dim]" ) return [] - + + def _aggregation_for_extra_column(self, column_name: str) -> str: + """ + Return how to aggregate an extra CSV column when merging multi-node results. + Best practice: throughput/counts -> sum; latencies/utilization -> average; + duration/capacity -> max; identifiers -> first. + """ + col = column_name.lower().strip() + # Sum: counts, totals, throughput-like + if any(k in col for k in [ + "count", "total", "samples", "tokens", "throughput", + "requests", "images", "bandwidth", "ops" + ]): + return "sum" + # Average: rates per unit, utilization, ratios + if any(k in col for k in [ + "utilization", "usage", "percent", "ratio", "latency", + "time_ms", "ttft", "tpot", "accuracy", "loss" + ]): + return "average" + # Max: duration (slowest node), memory, capacity + if any(k in col for k in [ + "duration", "time", "seconds", "memory", "bytes", "mb", "gb" + ]): + return "max" + return "first" + + def _merge_multi_node_multiple_results_csv( + self, csv_paths: List[Path], output_path: Path + ) -> bool: + """ + Merge multiple pod multiple_results CSVs into one with sum/average rules. + Rows are aligned by index (row 0 from each pod -> one merged row 0). + - performance: aggregated by _determine_aggregation_method(metric) (sum or average). + - Other numeric columns: by _aggregation_for_extra_column (sum/average/max). + - model, metric: taken from first CSV. + """ + import csv as csv_module + import statistics + + required = ["model", "performance", "metric"] + rows_by_index: Dict[int, List[Dict]] = {} + + for path in csv_paths: + try: + with open(path, "r", encoding="utf-8", errors="ignore") as f: + reader = csv_module.DictReader(f) + fieldnames = [c.strip() for c in (reader.fieldnames or [])] + if not all(h in fieldnames for h in required): + continue + for idx, row in enumerate(reader): + row = {k.strip(): v for k, v in row.items() if k} + if not row.get("performance") or not row.get("metric"): + continue + try: + float(str(row["performance"]).strip()) + except (ValueError, TypeError): + continue + if idx not in rows_by_index: + rows_by_index[idx] = [] + rows_by_index[idx].append(row) + except Exception as e: + self.console.print(f"[dim] Could not read {path.name}: {e}[/dim]") + continue + + if not rows_by_index: + return False + + # Build union of columns (required first, then rest) + extra_cols = set() + for group in rows_by_index.values(): + for row in group: + extra_cols.update(k for k in row if k not in required) + all_columns = list(required) + sorted(extra_cols) + merged_rows = [] + for idx in sorted(rows_by_index.keys()): + group = rows_by_index[idx] + first = group[0] + metric_name = (first.get("metric") or "").strip() + perf_agg = self._determine_aggregation_method(metric_name) + perf_values = [] + for r in group: + try: + perf_values.append(float(str(r.get("performance", "")).strip())) + except (ValueError, TypeError): + pass + if not perf_values: + continue + if perf_agg == "sum": + performance = sum(perf_values) + elif perf_agg == "average": + performance = statistics.mean(perf_values) + elif perf_agg == "max": + performance = max(perf_values) + else: + performance = sum(perf_values) + merged = { + "model": first.get("model", ""), + "performance": performance, + "metric": first.get("metric", ""), + } + for col in all_columns: + if col in merged: + continue + values = [r.get(col) for r in group] + try: + nums = [float(str(v).strip()) for v in values if v is not None and str(v).strip()] + except (ValueError, TypeError): + nums = [] + if nums: + extra_agg = self._aggregation_for_extra_column(col) + if extra_agg == "sum": + merged[col] = sum(nums) + elif extra_agg == "average": + merged[col] = statistics.mean(nums) + elif extra_agg == "max": + merged[col] = max(nums) + else: + merged[col] = first.get(col, "") + else: + merged[col] = first.get(col, "") + merged_rows.append(merged) + + if not merged_rows: + return False + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", newline="", encoding="utf-8") as f: + writer = csv_module.DictWriter(f, fieldnames=all_columns, extrasaction="ignore") + writer.writeheader() + writer.writerows(merged_rows) + self.console.print( + f"[green] ✓ Merged {len(csv_paths)} pod CSV(s) into {len(merged_rows)} row(s) → {output_path.name}[/green]" + ) + return True + + def _resolve_multiple_results_csv( + self, results_dir: Path, results: Dict, model_info: Dict + ) -> Optional[Path]: + """ + Resolve path to a single multiple_results CSV for update_perf_csv. + Single pod: return that CSV path. Multi-pod: merge all pod CSVs with + sum/average rules and return path to merged file. + """ + multiple_results_file = model_info.get("multiple_results") + filename = Path(multiple_results_file).name if multiple_results_file else None + csv_paths: List[Path] = [] + for art in results.get("artifacts", []): + if art.get("type") != "pvc_collection": + continue + local_path = Path(art.get("local_path", "")) + if not local_path.is_dir(): + continue + csv_path = (local_path / filename) if filename else None + if not csv_path or not csv_path.is_file(): + perf_csvs = sorted(local_path.glob("perf_*.csv")) + csv_path = perf_csvs[0] if perf_csvs else None + if csv_path and csv_path.is_file(): + csv_paths.append(csv_path) + if not csv_paths: + return None + if len(csv_paths) == 1: + return csv_paths[0] + merged_path = results_dir / "multiple_results_merged.csv" + if self._merge_multi_node_multiple_results_csv(csv_paths, merged_path): + return merged_path + return csv_paths[0] + def _determine_aggregation_method(self, metric_name: str) -> str: """ Determine how to aggregate a metric based on its name/type. diff --git a/src/madengine/deployment/templates/kubernetes/job.yaml.j2 b/src/madengine/deployment/templates/kubernetes/job.yaml.j2 index 7d98a07b..3a9e06f9 100644 --- a/src/madengine/deployment/templates/kubernetes/job.yaml.j2 +++ b/src/madengine/deployment/templates/kubernetes/job.yaml.j2 @@ -329,9 +329,26 @@ spec: echo "✓ Copied perf.csv" fi {% if multiple_results %} - if [ -f "{{ multiple_results }}" ]; then + # Copy multiple_results CSV (same contract as Docker: file may be in CWD or parent; use absolute paths for reliability) + if [ -f "/workspace/{{ multiple_results }}" ]; then + cp "/workspace/{{ multiple_results }}" /results/${HOSTNAME}/ + echo "✓ Copied {{ multiple_results }}" + elif [ -f "/workspace/vllm/{{ multiple_results }}" ]; then + cp "/workspace/vllm/{{ multiple_results }}" /results/${HOSTNAME}/ + echo "✓ Copied {{ multiple_results }}" + elif [ -f "{{ multiple_results }}" ]; then cp "{{ multiple_results }}" /results/${HOSTNAME}/ echo "✓ Copied {{ multiple_results }}" + elif [ -f "../{{ multiple_results }}" ]; then + cp "../{{ multiple_results }}" /results/${HOSTNAME}/ + echo "✓ Copied {{ multiple_results }} (from parent dir)" + else + # Fallback: find anywhere under /workspace (e.g. scripts/dummy/perf_dummy.csv for dummy_multi) + SRCFILE=$(find /workspace -maxdepth 4 -name "{{ multiple_results }}" 2>/dev/null | head -1) + if [ -n "$SRCFILE" ] && [ -f "$SRCFILE" ]; then + cp "$SRCFILE" /results/${HOSTNAME}/ + echo "✓ Copied {{ multiple_results }} (from workspace)" + fi fi {% endif %} @@ -496,9 +513,26 @@ spec: echo "✓ Copied perf.csv" fi {% if multiple_results %} - if [ -f "{{ multiple_results }}" ]; then + # Copy multiple_results CSV (same contract as Docker: file may be in CWD or parent; use absolute paths for reliability) + if [ -f "/workspace/{{ multiple_results }}" ]; then + cp "/workspace/{{ multiple_results }}" /results/${HOSTNAME}/ + echo "✓ Copied {{ multiple_results }}" + elif [ -f "/workspace/vllm/{{ multiple_results }}" ]; then + cp "/workspace/vllm/{{ multiple_results }}" /results/${HOSTNAME}/ + echo "✓ Copied {{ multiple_results }}" + elif [ -f "{{ multiple_results }}" ]; then cp "{{ multiple_results }}" /results/${HOSTNAME}/ echo "✓ Copied {{ multiple_results }}" + elif [ -f "../{{ multiple_results }}" ]; then + cp "../{{ multiple_results }}" /results/${HOSTNAME}/ + echo "✓ Copied {{ multiple_results }} (from parent dir)" + else + # Fallback: find anywhere under /workspace (e.g. scripts/dummy/perf_dummy.csv for dummy_multi) + SRCFILE=$(find /workspace -maxdepth 4 -name "{{ multiple_results }}" 2>/dev/null | head -1) + if [ -n "$SRCFILE" ] && [ -f "$SRCFILE" ]; then + cp "$SRCFILE" /results/${HOSTNAME}/ + echo "✓ Copied {{ multiple_results }} (from workspace)" + fi fi {% endif %} @@ -525,12 +559,6 @@ spec: fi done - # Copy tool-specific outputs - if ls -d *_output 1> /dev/null 2>&1; then - cp -r *_output /results/${HOSTNAME}/ 2>/dev/null || true - echo "✓ Copied tool output directories" - fi - # Copy GPU profiler outputs if ls gpu_info_*.csv 1> /dev/null 2>&1; then cp gpu_info_*.csv /results/${HOSTNAME}/