diff --git a/examples/k8s-configs/README.md b/examples/k8s-configs/README.md index 4963eb00..4b1bd006 100644 --- a/examples/k8s-configs/README.md +++ b/examples/k8s-configs/README.md @@ -373,21 +373,45 @@ kubectl exec -it -- ls -lh /data/ | **NAS** | SSH/rsync | NAS credentials in `credential.json` | | **Local** | Filesystem | Pre-mounted PVC | -### Storage Classes +### Storage Classes (local-path vs NFS) + +madengine separates **per-job results** from **long-lived shared data**: + +| Volume | Typical use | Single-node (`nnodes: 1`) | Multi-node (`nnodes > 1`) | +|--------|-------------|----------------------------|----------------------------| +| **`{job}-results`** | Benchmark artifacts (`/results`) | **RWO** — `local_path_storage_class` or `single_node_results_storage_class` (e.g. `local-path`) | **RWX** — `nfs_storage_class` or `multi_node_results_storage_class` (e.g. `nfs-banff`) | +| **`madengine-shared-data`** | Dataset cache (`/data`) | **RWX** — always `ReadWriteMany` + NFS class | Same PVC | + +**Built-in defaults (Banff-oriented)** are in `presets/k8s/defaults.json`: `nfs_storage_class` / `data_storage_class` → `nfs-banff`, `local_path_storage_class` → `local-path`, `recreate_shared_data_pvc` → `false`. You do not need to set these unless you use another cluster — then override in additional context. + +Example override for a different cluster: + +```json +{ + "k8s": { + "nfs_storage_class": "nfs-client", + "local_path_storage_class": "standard", + "data_storage_class": "nfs-client" + } +} +``` + +- **`nfs_storage_class`**: RWX class (e.g. `nfs-banff`) — used for shared-data (with `data_storage_class`) and multi-node results unless overridden. +- **`local_path_storage_class`**: RWO class for **single-node only** results PVC. +- **`data_storage_class`**: Optional override for `madengine-shared-data` only (defaults to `nfs_storage_class` then `storage_class`). +- **`single_node_results_storage_class`** / **`multi_node_results_storage_class`**: Optional fine-grained overrides for results PVCs. +- **`recreate_shared_data_pvc`**: If `true`, deletes existing `madengine-shared-data` before create (**destroys data** — backup first). Use when migrating from RWO `local-path` to RWX NFS. + +**Single-Node (RWO)** (results only): -**Single-Node (RWO)**: - ✅ `local-path` (Rancher) - ✅ AWS EBS (`gp3`, `io2`) -- ✅ Azure Disk -- ✅ Any RWO storage class -**Multi-Node (RWX)**: -- ✅ NFS (`nfs-client`) -- ✅ CephFS -- ✅ GlusterFS -- ✅ AWS EFS -- ✅ Azure Files -- ❌ `local-path` (RWO only) +**Multi-node & shared-data (RWX)**: + +- ✅ NFS (e.g. `nfs-banff`, `nfs-client`) +- ✅ CephFS, GlusterFS, AWS EFS, Azure Files +- ❌ `local-path` (RWO only — not for shared-data or multi-node results) ### Custom PVC (Optional) @@ -513,6 +537,13 @@ To use an existing PVC instead of auto-creation: |-------|------|---------|-------------| | `data_pvc` | string | `null` | Data PVC name (auto-created if using data provider) | | `results_pvc` | string | `null` | Results PVC name (auto-created by default) | +| `storage_class` | string | `null` | Optional fallback if the keys below are unset | +| `nfs_storage_class` | string | **`nfs-banff`** (preset) | RWX class for shared-data / multi-node results | +| `local_path_storage_class` | string | **`local-path`** (preset) | RWO class for single-node `{job}-results` | +| `data_storage_class` | string | **`nfs-banff`** (preset) | Overrides SC for shared-data only | +| `single_node_results_storage_class` | string | `null` | Overrides single-node results SC (`local_path_storage_class` if unset) | +| `multi_node_results_storage_class` | string | `null` | Overrides multi-node results SC (`nfs_storage_class` if unset) | +| `recreate_shared_data_pvc` | boolean | **`false`** (preset) | If `true`, delete `madengine-shared-data` before create (data loss) | #### Distributed Execution Fields @@ -594,6 +625,18 @@ host_ipc: true PVCs: Recommended for data and results ``` +### Local `k8s_results` layout (after `madengine run`) + +Artifacts are written under `./k8s_results//`: + +| Path | Contents | +|------|----------| +| `//pod.log` | Container log from the Kubernetes API | +| `//pvc/` | Copy of `/results//` from the results PVC, matched to that pod | +| `/pvc_unmapped//` | PVC folders that could not be matched to a pod name | + +Write durable outputs under `/results//` in the container so each replica’s files land in a predictable PVC subdir (e.g. hostname or `jobname-`). Madengine maps that subdir to the full pod name when copying to the host. + ### Distributed Launchers **Training Launchers:** diff --git a/examples/k8s-configs/basic/02-torchrun-single-node-multi-gpu.json b/examples/k8s-configs/basic/02-torchrun-single-node-multi-gpu.json index f198dff7..be0d7c5e 100644 --- a/examples/k8s-configs/basic/02-torchrun-single-node-multi-gpu.json +++ b/examples/k8s-configs/basic/02-torchrun-single-node-multi-gpu.json @@ -17,7 +17,8 @@ "cpu_limit": "32", "image_pull_policy": "Always", - "backoff_limit": 3 + "backoff_limit": 3, + "recreate_shared_data_pvc": true }, "distributed": { diff --git a/examples/k8s-configs/basic/03-torchrun-multi-node-basic.json b/examples/k8s-configs/basic/03-torchrun-multi-node-basic.json index 0d35cb2b..0c2205f9 100644 --- a/examples/k8s-configs/basic/03-torchrun-multi-node-basic.json +++ b/examples/k8s-configs/basic/03-torchrun-multi-node-basic.json @@ -18,7 +18,8 @@ "image_pull_policy": "Always", "backoff_limit": 3, - "host_ipc": true + "host_ipc": true, + "recreate_shared_data_pvc": true }, "distributed": { diff --git a/src/madengine/deployment/kubernetes.py b/src/madengine/deployment/kubernetes.py index d4dcdf44..71550d5e 100644 --- a/src/madengine/deployment/kubernetes.py +++ b/src/madengine/deployment/kubernetes.py @@ -74,6 +74,44 @@ from .kubernetes_launcher_mixin import KubernetesLauncherMixin +def match_pvc_subdir_to_k8s_pod( + pvc_subdir: str, + pod_names: List[str], + assigned: set, +) -> Optional[str]: + """ + Map one top-level name under /results/ to a full Kubernetes pod name. + + Matches ``pod == pvc_subdir`` or ``pod.startswith(pvc_subdir + "-")`` among pods + not yet assigned. Prefer exact equality; if multiple prefix matches, pick the + first sorted name (deterministic). + """ + available = sorted(p for p in pod_names if p not in assigned) + exact = [p for p in available if p == pvc_subdir] + if exact: + return exact[0] + prefixed = [p for p in available if p.startswith(pvc_subdir + "-")] + if not prefixed: + return None + return sorted(prefixed)[0] + + +def assign_pvc_subdirs_to_pods(pod_dirs: List[str], pod_names: List[str]) -> Dict[str, str]: + """ + Assign each PVC subdir to at most one pod. Process longest names first so + short prefixes do not steal pods (e.g. ``foo-0`` before ``foo``). + """ + cleaned = [d.strip() for d in pod_dirs if d and d.strip()] + assigned: set = set() + mapping: Dict[str, str] = {} + for pvc_subdir in sorted(cleaned, key=lambda x: (-len(x), x)): + m = match_pvc_subdir_to_k8s_pod(pvc_subdir, pod_names, assigned) + if m: + mapping[pvc_subdir] = m + assigned.add(m) + return mapping + + class KubernetesDeployment(KubernetesLauncherMixin, BaseDeployment): """ Kubernetes cluster deployment using Python client library. @@ -1329,28 +1367,65 @@ def _save_debug_manifests(self): f"[yellow]Debug: Manifests saved to {output_dir}[/yellow]" ) - def _create_results_pvc(self) -> str: + def _k8s_data_storage_class(self) -> Optional[str]: + """StorageClass for long-lived ``madengine-shared-data`` (NFS RWX recommended).""" + return ( + self.k8s_config.get("data_storage_class") + or self.k8s_config.get("nfs_storage_class") + or self.k8s_config.get("storage_class") + ) + + def _k8s_results_storage_class(self, nnodes: int) -> Optional[str]: """ - Create a PersistentVolumeClaim for results storage. - - Returns: - Name of the created PVC + Per-job results: local-path (RWO) for single-node, NFS (RWX) for multi-node. + + Falls back to ``storage_class`` for backward compatibility. + """ + if nnodes > 1: + return ( + self.k8s_config.get("multi_node_results_storage_class") + or self.k8s_config.get("nfs_storage_class") + or self.k8s_config.get("storage_class") + ) + return ( + self.k8s_config.get("single_node_results_storage_class") + or self.k8s_config.get("local_path_storage_class") + or self.k8s_config.get("storage_class") + ) + + def _create_results_pvc(self, nnodes: int = 1) -> str: + """ + Create a PersistentVolumeClaim for per-job results. + + Single-node uses ReadWriteOnce (typically local-path). Multi-node uses + ReadWriteMany (typically nfs-banff or other RWX class). """ pvc_name = f"{self.job_name}-results" - - # Render PVC template + access_mode = "ReadWriteMany" if nnodes > 1 else "ReadWriteOnce" + storage_class = self._k8s_results_storage_class(nnodes) + template_dir = Path(__file__).parent / "templates" / "kubernetes" pvc_template = template_dir / "pvc.yaml.j2" - + with open(pvc_template, "r") as f: pvc_template_str = f.read() - + template = Template(pvc_template_str) + self.console.print( + f"[dim] Results PVC: access={access_mode}, " + f"storageClass={storage_class or '(cluster default)'}[/dim]" + ) + if nnodes > 1 and not storage_class: + self.console.print( + "[yellow]⚠️ Multi-node: set k8s.nfs_storage_class or " + "multi_node_results_storage_class to an RWX class (e.g. nfs-banff).[/yellow]" + ) pvc_yaml = template.render( pvc_name=pvc_name, namespace=self.namespace, + access_mode=access_mode, storage_size=self.k8s_config.get("results_storage_size", "10Gi"), - storage_class=self.k8s_config.get("storage_class") + storage_class=storage_class, ) # Create PVC (retry on 409 "object is being deleted" until it is gone) @@ -1375,98 +1450,129 @@ def _create_results_pvc(self) -> str: else: raise + def _wait_for_pvc_deleted(self, pvc_name: str, max_wait: int = 90) -> None: + """Block until the PVC is fully removed (or timeout).""" + for i in range(max_wait): + try: + self.core_v1.read_namespaced_persistent_volume_claim( + name=pvc_name, namespace=self.namespace + ) + if i > 0 and i % 10 == 0: + self.console.print( + f"[dim]Waiting for PVC {pvc_name} to be removed... ({i}s)[/dim]" + ) + time.sleep(1) + except ApiException as e: + if e.status == 404: + return + raise + def _create_or_get_data_pvc(self, nnodes: int = 1) -> str: """ - Create or reuse a shared PersistentVolumeClaim for data storage. - - K8s best practice: Use shared PVC for data (separate from compute pods). - This PVC is reusable across multiple training runs. - + Create or reuse ``madengine-shared-data`` for long-lived datasets (cache). + + Always uses ReadWriteMany + an NFS-style StorageClass so the same PVC + works for single- and multi-pod jobs. Use ``data_storage_class`` or + ``nfs_storage_class`` (e.g. nfs-banff), not local-path. + Args: - nnodes: Number of nodes (determines access mode requirements) - + nnodes: Reserved for logging (shared-data access mode does not depend on it). + Returns: Name of the PVC (existing or newly created) """ - # Use a consistent name for reusability (not job-specific) pvc_name = "madengine-shared-data" - - # Check if PVC already exists (idempotent) + + if self.k8s_config.get("recreate_shared_data_pvc"): + try: + self.core_v1.delete_namespaced_persistent_volume_claim( + name=pvc_name, namespace=self.namespace + ) + self.console.print( + "[yellow]recreate_shared_data_pvc: deleted existing " + f"{pvc_name} (backup data first if needed)[/yellow]" + ) + self._wait_for_pvc_deleted(pvc_name) + except ApiException as e: + if e.status != 404: + raise + try: existing_pvc = self.core_v1.read_namespaced_persistent_volume_claim( name=pvc_name, - namespace=self.namespace + namespace=self.namespace, ) self.console.print(f"[dim]✓ Using existing data PVC: {pvc_name}[/dim]") - - # Verify access mode for multi-node - if nnodes > 1: - access_modes = existing_pvc.spec.access_modes - if "ReadWriteMany" not in access_modes: - self.console.print( - f"[yellow]⚠️ Warning: PVC {pvc_name} doesn't support ReadWriteMany[/yellow]" - ) - self.console.print( - f"[yellow] Multi-node deployment may fail. Current modes: {access_modes}[/yellow]" - ) - + + access_modes = existing_pvc.spec.access_modes or [] + if "ReadWriteMany" not in access_modes: + self.console.print( + f"[yellow]⚠️ Warning: {pvc_name} is not ReadWriteMany " + f"(modes: {access_modes}).[/yellow]" + ) + self.console.print( + "[yellow] For NFS-backed long-lived data, delete the PVC and re-run with " + "k8s.data_storage_class / nfs_storage_class set, or use " + "recreate_shared_data_pvc (after backup).[/yellow]" + ) return pvc_name - + except ApiException as e: if e.status != 404: - raise # Unexpected error - - # PVC doesn't exist, create it - # Determine access mode based on deployment topology - # RWO (ReadWriteOnce): Single-node - works with most storage classes (local-path, EBS, etc.) - # RWX (ReadWriteMany): Multi-node - requires shared storage (NFS, CephFS, etc.) - access_mode = "ReadWriteMany" if nnodes > 1 else "ReadWriteOnce" - - self.console.print(f"[blue]Creating shared data PVC: {pvc_name}...[/blue]") - self.console.print(f"[dim] Access mode: {access_mode} ({'multi-node' if nnodes > 1 else 'single-node'})[/dim]") - - # Render data PVC template - template_dir = Path(__file__).parent / "templates" / "kubernetes" - pvc_template = template_dir / "pvc-data.yaml.j2" - - with open(pvc_template, "r") as f: - pvc_template_str = f.read() - - template = Template(pvc_template_str) - pvc_yaml = template.render( - pvc_name=pvc_name, - namespace=self.namespace, - access_mode=access_mode, - storage_size=self.k8s_config.get("data_storage_size", "100Gi"), - storage_class=self.k8s_config.get("storage_class") - ) - - # Create PVC - pvc_dict = yaml.safe_load(pvc_yaml) - self.core_v1.create_namespaced_persistent_volume_claim( - namespace=self.namespace, body=pvc_dict + raise + + access_mode = "ReadWriteMany" + storage_class = self._k8s_data_storage_class() + self.console.print(f"[blue]Creating shared data PVC: {pvc_name}...[/blue]") + self.console.print( + f"[dim] Access mode: {access_mode}; storageClass={storage_class or '(cluster default)'}; " + f"nnodes={nnodes}[/dim]" + ) + if not storage_class: + self.console.print( + "[yellow]⚠️ Set k8s.nfs_storage_class or data_storage_class to an RWX class " + "(e.g. nfs-banff) for shared-data. Default SC may be local-path (RWO-only).[/yellow]" ) - - # Wait for PVC to be bound (important!) - self.console.print(f"[dim]Waiting for PVC to be bound...[/dim]") - for _ in range(30): # Wait up to 30 seconds - try: - pvc = self.core_v1.read_namespaced_persistent_volume_claim( - name=pvc_name, namespace=self.namespace - ) - if pvc.status.phase == "Bound": - self.console.print(f"[green]✓ PVC bound successfully[/green]") - break - except ApiException: - pass - time.sleep(1) - else: - self.console.print( - f"[yellow]⚠️ Warning: PVC created but not bound yet. " - f"Check: kubectl describe pvc {pvc_name}[/yellow]" + + template_dir = Path(__file__).parent / "templates" / "kubernetes" + pvc_template = template_dir / "pvc-data.yaml.j2" + + with open(pvc_template, "r") as f: + pvc_template_str = f.read() + + template = Template(pvc_template_str) + pvc_yaml = template.render( + pvc_name=pvc_name, + namespace=self.namespace, + access_mode=access_mode, + storage_size=self.k8s_config.get("data_storage_size", "100Gi"), + storage_class=storage_class, + ) + + pvc_dict = yaml.safe_load(pvc_yaml) + self.core_v1.create_namespaced_persistent_volume_claim( + namespace=self.namespace, body=pvc_dict + ) + + self.console.print("[dim]Waiting for PVC to be bound...[/dim]") + for _ in range(30): + try: + pvc = self.core_v1.read_namespaced_persistent_volume_claim( + name=pvc_name, namespace=self.namespace ) - - return pvc_name + if pvc.status.phase == "Bound": + self.console.print("[green]✓ PVC bound successfully[/green]") + break + except ApiException: + pass + time.sleep(1) + else: + self.console.print( + f"[yellow]⚠️ Warning: PVC created but not bound yet. " + f"Check: kubectl describe pvc {pvc_name}[/yellow]" + ) + + return pvc_name def _cleanup_existing_resources(self): """Delete existing Job, ConfigMap, and Service if they exist.""" @@ -1570,7 +1676,8 @@ def deploy(self) -> DeploymentResult: # 1. Create PVC for results storage self.console.print("[blue]Creating PVC for results storage...[/blue]") - pvc_name = self._create_results_pvc() + nnodes_deploy = getattr(self, "_nnodes", 1) + pvc_name = self._create_results_pvc(nnodes=nnodes_deploy) self.console.print(f"[green]✓ Created PVC: {pvc_name}[/green]") # 1b. Create or reuse data PVC if data provider is configured and auto-creation was flagged @@ -1812,6 +1919,48 @@ def _print_pod_logs_on_failure(self, deployment_id: str): except Exception: pass + def _primary_workload_container_exit_code(self, pod: Any) -> int: + """ + Exit code of the primary workload container (spec.containers[0]), matched by name + against container_statuses (ordering-safe if sidecars are added later). + """ + if not pod.spec or not pod.spec.containers: + return 0 + primary_name = pod.spec.containers[0].name + for cs in pod.status.container_statuses or []: + if cs.name == primary_name and cs.state and cs.state.terminated: + return cs.state.terminated.exit_code or 0 + # Fallback: first terminated container in spec order + name_order = [c.name for c in pod.spec.containers] + for want in name_order: + for cs in pod.status.container_statuses or []: + if cs.name == want and cs.state and cs.state.terminated: + return cs.state.terminated.exit_code or 0 + return 0 + + def _refresh_pod_until_terminal_phase( + self, + pod_name: str, + *, + timeout_seconds: float = 30.0, + interval_seconds: float = 0.5, + ) -> Any: + """ + Poll read_namespaced_pod until phase is Succeeded or Failed, or timeout. + Avoids stale list/single-get right after Job completion (phase still Running). + """ + deadline = time.monotonic() + timeout_seconds + last: Any = None + while time.monotonic() < deadline: + last = self.core_v1.read_namespaced_pod( + name=pod_name, namespace=self.namespace + ) + phase = last.status.phase + if phase in ("Succeeded", "Failed"): + return last + time.sleep(interval_seconds) + return last + def collect_results(self, deployment_id: str) -> Dict[str, Any]: """ Enhanced results collection from K8s pods following vLLM multi-node best practices. @@ -1822,9 +1971,9 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: - Total throughput = pod-0 throughput × num_replicas Collects: - 1. Pod logs - 2. File artifacts via kubectl cp (profiling, tracing, env details) - 3. Results from shared PVC (if configured) + 1. Pod logs (``k8s_results///pod.log``) + 2. PVC mirror per pod (``...//pvc/``), mapped from ``/results//`` + 3. File artifacts via kubectl cp when pods are still running (keep-alive path) Returns: Dict with logs, artifacts, and performance results @@ -1926,7 +2075,7 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: log = self.core_v1.read_namespaced_pod_log( name=pod_name, namespace=self.namespace ) - log_file = pod_dir / f"{pod_name}.log" + log_file = pod_dir / "pod.log" log_file.write_text(log) results["logs"].append({ "pod": pod_name, @@ -1939,13 +2088,12 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: log, model_info.get("name", "") ) - # Get pod exit status - pod_status = pod.status.phase - pod_exit_code = 0 - if pod.status.container_statuses: - container_status = pod.status.container_statuses[0] - if container_status.state.terminated: - pod_exit_code = container_status.state.terminated.exit_code or 0 + # Pod phase/exit can lag right after Job success; poll until terminal or timeout + pod = self._refresh_pod_until_terminal_phase(pod_name) + pod_status = pod.status.phase if pod else "Unknown" + pod_exit_code = ( + self._primary_workload_container_exit_code(pod) if pod else -1 + ) # Store per-node info for display table node_info = { @@ -2006,7 +2154,8 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: ) # Collect artifacts from PVC before deciding success/failure (needed for multiple_results fallback) - self._collect_from_pvc(deployment_id, results_dir, results) + k8s_pod_names = [p.metadata.name for p in sorted_pods] + self._collect_from_pvc(deployment_id, results_dir, results, pod_names=k8s_pod_names) # ======================================================================== # Aggregate per-node metrics @@ -2401,17 +2550,30 @@ def _collect_pod_artifacts(self, pod_name: str, dest_dir: Path) -> List[Dict]: return artifacts - def _collect_from_pvc(self, deployment_id: str, results_dir: Path, results: Dict): + def _collect_from_pvc( + self, + deployment_id: str, + results_dir: Path, + results: Dict, + pod_names: Optional[List[str]] = None, + ): """ Collect all artifacts from the PVC using a temporary busybox pod. - + This is the best practice for collecting results from completed K8s jobs. kubectl cp doesn't work on completed pods, so we use a helper pod. - + + When ``pod_names`` is provided, each ``/results//`` is copied to + ``results_dir//pvc/`` by matching subdir to pod name (exact or + ``pod.startswith(subdir + "-")``). Unmatched subdirs go under + ``results_dir/pvc_unmapped//``. When ``pod_names`` is omitted, the + legacy layout ``results_dir//`` is used. + Args: deployment_id: Job deployment ID results_dir: Local directory to save results results: Results dict to update + pod_names: Full Kubernetes pod names for this job (ordered) """ pvc_name = f"{deployment_id}-results" @@ -2470,48 +2632,118 @@ def _collect_from_pvc(self, deployment_id: str, results_dir: Path, results: Dict time.sleep(1) else: raise Exception("Collector pod did not start in time") - - # List pod result directories in PVC + + # Mount / NFS may need a moment before another pod sees prior job writes. + time.sleep(2) + + # List pod result directories in PVC (retry: NFS can lag right after Job completion) list_cmd = [ - "kubectl", "exec", collector_pod_name, "-n", self.namespace, "--", - "ls", "-1", "/results/" + "kubectl", + "exec", + collector_pod_name, + "-n", + self.namespace, + "-c", + "collector", + "--", + "ls", + "-1", + "/results/", ] - list_result = subprocess.run(list_cmd, capture_output=True, text=True, timeout=10) - - if list_result.returncode == 0 and list_result.stdout.strip(): - pod_dirs = list_result.stdout.strip().split('\n') - + list_result = subprocess.CompletedProcess( + args=list_cmd, returncode=-1, stdout="", stderr="" + ) + pod_dirs: List[str] = [] + for attempt in range(45): + list_result = subprocess.run( + list_cmd, capture_output=True, text=True, timeout=30 + ) + if list_result.returncode == 0 and list_result.stdout.strip(): + pod_dirs = [ + d + for d in list_result.stdout.strip().split("\n") + if d and d != "lost+found" + ] + if pod_dirs: + break + if list_result.stderr.strip(): + self.console.print( + f"[dim] PVC ls attempt {attempt + 1} (rc={list_result.returncode}): " + f"{list_result.stderr.strip()[:300]}[/dim]" + ) + time.sleep(1) + + if pod_dirs: + pvc_map: Dict[str, str] = {} + if pod_names: + pvc_map = assign_pvc_subdirs_to_pods(pod_dirs, pod_names) + for pod_dir_name in pod_dirs: if not pod_dir_name: continue - - # Copy entire pod directory - local_pod_dir = results_dir / pod_dir_name - local_pod_dir.mkdir(exist_ok=True) - + + matched_pod = pvc_map.get(pod_dir_name) if pod_names else None + if pod_names: + if matched_pod: + local_pod_dir = results_dir / matched_pod / "pvc" + else: + local_pod_dir = results_dir / "pvc_unmapped" / pod_dir_name + else: + local_pod_dir = results_dir / pod_dir_name + + local_pod_dir.mkdir(parents=True, exist_ok=True) + cp_cmd = [ - "kubectl", "cp", + "kubectl", + "cp", + "-c", + "collector", f"{self.namespace}/{collector_pod_name}:/results/{pod_dir_name}", - str(local_pod_dir) + str(local_pod_dir), ] - + cp_result = subprocess.run(cp_cmd, capture_output=True, text=True, timeout=60) - + if cp_result.returncode == 0: # Count collected files file_count = sum(1 for _ in local_pod_dir.rglob('*') if _.is_file()) if file_count > 0: - results["artifacts"].append({ + art: Dict[str, Any] = { "source": f"PVC:{pvc_name}/{pod_dir_name}", "local_path": str(local_pod_dir), "file_count": file_count, - "type": "pvc_collection" - }) - self.console.print(f"[dim] ✓ Collected {file_count} files from {pod_dir_name}[/dim]") + "type": "pvc_collection", + "pvc_subdir": pod_dir_name, + } + if pod_names: + art["k8s_pod"] = matched_pod + results["artifacts"].append(art) + if matched_pod: + dest_hint = f"{matched_pod}/pvc" + elif pod_names: + dest_hint = f"pvc_unmapped/{pod_dir_name}" + else: + dest_hint = pod_dir_name + self.console.print( + f"[dim] ✓ Collected {file_count} files from {pod_dir_name} → {dest_hint}[/dim]" + ) self.console.print(f"[green]✓ Collected artifacts from PVC[/green]") else: - self.console.print(f"[yellow]⚠ No results found in PVC[/yellow]") + hint = "" + if list_result.returncode != 0 or list_result.stderr.strip(): + hint = ( + f" (kubectl exec rc={list_result.returncode}" + + ( + f", stderr={list_result.stderr.strip()[:400]!r}" + if list_result.stderr.strip() + else "" + ) + + ")" + ) + self.console.print( + f"[yellow]⚠ No results found in PVC after retries{hint}[/yellow]" + ) # Cleanup collector pod self.core_v1.delete_namespaced_pod( @@ -2533,6 +2765,12 @@ def _generate_results_summary(self, results: Dict, results_dir: Path): "job_name": results["job_name"], "namespace": results["namespace"], "collected_at": datetime.now().isoformat(), + "k8s_results_layout": ( + "Per pod: //pod.log (API log) and " + "//pvc/ (mirror of /results//). " + "Unmatched PVC subdirs: /pvc_unmapped//." + ), + "layout_version": 2, "pods": len(results["logs"]), "total_artifacts": len(results["artifacts"]), "artifacts_by_type": {}, diff --git a/src/madengine/deployment/presets/k8s/defaults.json b/src/madengine/deployment/presets/k8s/defaults.json index 8d25eccd..36fc9f3e 100644 --- a/src/madengine/deployment/presets/k8s/defaults.json +++ b/src/madengine/deployment/presets/k8s/defaults.json @@ -11,6 +11,10 @@ "backoff_limit": 3, "ttl_seconds_after_finished": null, "allow_privileged_profiling": null, + "nfs_storage_class": "nfs-banff", + "local_path_storage_class": "local-path", + "data_storage_class": "nfs-banff", + "recreate_shared_data_pvc": false, "secrets": { "strategy": "from_local_credentials", "image_pull_secret_names": [], diff --git a/src/madengine/deployment/templates/kubernetes/pvc-data.yaml.j2 b/src/madengine/deployment/templates/kubernetes/pvc-data.yaml.j2 index 68a1934b..c5bc8396 100644 --- a/src/madengine/deployment/templates/kubernetes/pvc-data.yaml.j2 +++ b/src/madengine/deployment/templates/kubernetes/pvc-data.yaml.j2 @@ -11,9 +11,8 @@ metadata: description: "Shared data storage for madengine (auto-created)" spec: accessModes: - # RWO for single-node (broader storage class support) - # RWX for multi-node (requires NFS or similar) - - {{ access_mode | default("ReadWriteOnce") }} + # Long-lived shared data: always ReadWriteMany (NFS/EFS) for single- and multi-pod. + - {{ access_mode | default("ReadWriteMany") }} resources: requests: storage: {{ storage_size | default("100Gi") }} diff --git a/src/madengine/deployment/templates/kubernetes/pvc.yaml.j2 b/src/madengine/deployment/templates/kubernetes/pvc.yaml.j2 index 2852a355..fe1395e0 100644 --- a/src/madengine/deployment/templates/kubernetes/pvc.yaml.j2 +++ b/src/madengine/deployment/templates/kubernetes/pvc.yaml.j2 @@ -8,7 +8,8 @@ metadata: madengine-pvc: "true" spec: accessModes: - - ReadWriteOnce # Single-node access is sufficient for per-job results collection + # RWO: single-node results (e.g. local-path). RWX: multi-node (e.g. nfs-banff). + - {{ access_mode | default("ReadWriteOnce") }} resources: requests: storage: {{ storage_size | default("10Gi") }} diff --git a/tests/unit/test_k8s_secrets.py b/tests/unit/test_k8s.py similarity index 58% rename from tests/unit/test_k8s_secrets.py rename to tests/unit/test_k8s.py index 78865b4c..7bcf49a2 100644 --- a/tests/unit/test_k8s_secrets.py +++ b/tests/unit/test_k8s.py @@ -1,4 +1,9 @@ -"""Unit tests for Kubernetes Secret helpers and ConfigMap size estimate.""" +""" +Kubernetes-related unit tests (secrets/config helpers, PVC → pod mapping). + +Keep new K8s-focused unit tests here to avoid many small `test_k8s_*.py` files. +Integration/e2e tests stay in their own modules. +""" from madengine.deployment.k8s_secrets import ( CONFIGMAP_MAX_BYTES, @@ -11,6 +16,10 @@ resolve_runtime_secret_name, build_registry_secret_data, ) +from madengine.deployment.kubernetes import ( + assign_pvc_subdirs_to_pods, + match_pvc_subdir_to_k8s_pod, +) def test_merge_secrets_config_defaults(): @@ -99,3 +108,49 @@ def test_estimate_skips_credential_when_not_in_configmap(): "common_script_contents": {}, } assert estimate_configmap_payload_bytes(ctx) < 100 + + +# --- PVC /results subdir → pod name (kubernetes.collect_results) ------------ + + +def test_pvc_match_exact(): + assigned: set = set() + assert match_pvc_subdir_to_k8s_pod("my-pod", ["my-pod", "my-pod-0-abc"], assigned) == "my-pod" + assigned.add("my-pod") + assert match_pvc_subdir_to_k8s_pod("my-pod", ["my-pod", "my-pod-0-abc"], assigned) == "my-pod-0-abc" + + +def test_pvc_match_prefix_indexed_job(): + assigned: set = set() + pods = ["madengine-dummy-torchrun-0-fz7th", "madengine-dummy-torchrun-1-88hw6"] + assert ( + match_pvc_subdir_to_k8s_pod("madengine-dummy-torchrun-0", pods, assigned) + == "madengine-dummy-torchrun-0-fz7th" + ) + assigned.add("madengine-dummy-torchrun-0-fz7th") + assert ( + match_pvc_subdir_to_k8s_pod("madengine-dummy-torchrun-1", pods, assigned) + == "madengine-dummy-torchrun-1-88hw6" + ) + + +def test_pvc_assign_longest_subdir_first(): + pod_names = ["madengine-dummy-torchrun-0-fz7th", "madengine-dummy-torchrun-1-88hw6"] + mapping = assign_pvc_subdirs_to_pods( + ["madengine-dummy-torchrun-0", "madengine-dummy-torchrun-1"], + pod_names, + ) + assert mapping["madengine-dummy-torchrun-0"] == "madengine-dummy-torchrun-0-fz7th" + assert mapping["madengine-dummy-torchrun-1"] == "madengine-dummy-torchrun-1-88hw6" + + +def test_pvc_assign_no_duplicate_pods(): + pods = ["a-x", "a-y"] + m = assign_pvc_subdirs_to_pods(["a"], pods) + assert len(m) == 1 + assert m["a"] in pods + + +def test_pvc_assign_empty_dirs(): + assert assign_pvc_subdirs_to_pods([], ["p"]) == {} + assert assign_pvc_subdirs_to_pods([" ", ""], ["p"]) == {}