diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index 98511bfa..a5d2b1ee 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -91,9 +91,6 @@ jobs: fail-fast: false matrix: include: - - script: L2_Functional_Tests_GPU - runner: self-hosted-nemo - timeout: 30 - script: L2_Mcore_Mock_Tests_GPU runner: self-hosted-nemo timeout: 30 diff --git a/dfm/src/megatron/model/wan/flow_matching/flow_inference_pipeline.py b/dfm/src/megatron/model/wan/flow_matching/flow_inference_pipeline.py index 2bbb0eb3..459876b2 100644 --- a/dfm/src/megatron/model/wan/flow_matching/flow_inference_pipeline.py +++ b/dfm/src/megatron/model/wan/flow_matching/flow_inference_pipeline.py @@ -229,13 +229,13 @@ def forward_pp_step( """ pp_world_size = parallel_state.get_pipeline_model_parallel_world_size() - is_pp_first = parallel_state.is_pipeline_first_stage(ignore_virtual=True) - is_pp_last = parallel_state.is_pipeline_last_stage(ignore_virtual=True) - - # PP=1: no pipeline parallelism + # PP=1: no pipeline parallelism (avoid touching PP groups which may be uninitialized in unit tests) if pp_world_size == 1: noise_pred_pp = self.model(latent_model_input, grid_sizes=grid_sizes, t=timestep, **arg_c) return noise_pred_pp + # For PP>1, safe to query stage information + is_pp_first = parallel_state.is_pipeline_first_stage(ignore_virtual=True) + is_pp_last = parallel_state.is_pipeline_last_stage(ignore_virtual=True) # PP>1: pipeline parallelism hidden_size = self.model.config.hidden_size diff --git a/tests/unit_tests/L0_Unit_Tests_CPU.sh b/tests/unit_tests/L0_Unit_Tests_CPU.sh index 081c1564..dd8c1ee4 100644 --- a/tests/unit_tests/L0_Unit_Tests_CPU.sh +++ b/tests/unit_tests/L0_Unit_Tests_CPU.sh @@ -14,4 +14,4 @@ # Hide GPU from PyTorch by setting CUDA_VISIBLE_DEVICES to empty # This makes torch.cuda.is_available() return False -CUDA_VISIBLE_DEVICES="" uv run coverage run -a --data-file=/opt/DFM/.coverage --source=/opt/DFM/ -m pytest tests/unit_tests -m "not pleasefixme" --with_downloads +CUDA_VISIBLE_DEVICES="" uv run --group megatron-bridge coverage run -a --data-file=/opt/DFM/.coverage --source=/opt/DFM/ -m pytest tests/unit_tests -m "not pleasefixme" --with_downloads diff --git a/tests/unit_tests/L0_Unit_Tests_GPU.sh b/tests/unit_tests/L0_Unit_Tests_GPU.sh index ae77eb3f..0468cab6 100644 --- a/tests/unit_tests/L0_Unit_Tests_GPU.sh +++ b/tests/unit_tests/L0_Unit_Tests_GPU.sh @@ -11,4 +11,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -CUDA_VISIBLE_DEVICES="0,1" uv run coverage run -a --data-file=/opt/DFM/.coverage --source=/opt/DFM/ -m pytest tests/unit_tests -m "not pleasefixme" --with_downloads +CUDA_VISIBLE_DEVICES="0,1" uv run --group megatron-bridge coverage run -a --data-file=/opt/DFM/.coverage --source=/opt/DFM/ -m pytest tests/unit_tests -m "not pleasefixme" --with_downloads diff --git a/tests/unit_tests/megatron/data/wan/test_wan_energon_datamodule.py b/tests/unit_tests/megatron/data/wan/test_wan_energon_datamodule.py new file mode 100644 index 00000000..c4dc6014 --- /dev/null +++ b/tests/unit_tests/megatron/data/wan/test_wan_energon_datamodule.py @@ -0,0 +1,67 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dfm.src.megatron.data.wan import wan_energon_datamodule as wan_dm_mod +from dfm.src.megatron.data.wan.wan_taskencoder import WanTaskEncoder + + +class _FakeDiffusionDataModule: + def __init__( + self, + *, + path: str, + seq_length: int, + packing_buffer_size: int, + task_encoder, + micro_batch_size: int, + global_batch_size: int, + num_workers: int, + ): + self.path = path + self.seq_length = seq_length + self.packing_buffer_size = packing_buffer_size + self.task_encoder = task_encoder + self.micro_batch_size = micro_batch_size + self.global_batch_size = global_batch_size + self.num_workers = num_workers + + # mimic API used by WanDataModuleConfig.build_datasets + def train_dataloader(self): + return "train" + + +def test_wan_datamodule_config_initialization(monkeypatch): + # Patch the symbol used inside wan_energon_datamodule module + monkeypatch.setattr(wan_dm_mod, "DiffusionDataModule", _FakeDiffusionDataModule) + + cfg = wan_dm_mod.WanDataModuleConfig( + path="", + seq_length=128, + task_encoder_seq_length=128, + packing_buffer_size=4, + micro_batch_size=2, + global_batch_size=8, + num_workers=0, + ) + + # __post_init__ should construct a dataset with WanTaskEncoder and propagate seq_length + assert isinstance(cfg.dataset, _FakeDiffusionDataModule) + assert cfg.sequence_length == cfg.dataset.seq_length == 128 + assert isinstance(cfg.dataset.task_encoder, WanTaskEncoder) + assert cfg.dataset.task_encoder.seq_length == 128 + assert cfg.dataset.task_encoder.packing_buffer_size == 4 + + # build_datasets should return train loader thrice + train, val, test = cfg.build_datasets(context=None) + assert train == "train" and val == "train" and test == "train" diff --git a/tests/unit_tests/megatron/data/wan/test_wan_mock_datamodule.py b/tests/unit_tests/megatron/data/wan/test_wan_mock_datamodule.py new file mode 100644 index 00000000..e1980052 --- /dev/null +++ b/tests/unit_tests/megatron/data/wan/test_wan_mock_datamodule.py @@ -0,0 +1,65 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +from torch.utils.data import DataLoader + +from dfm.src.megatron.data.wan.wan_mock_datamodule import WanMockDataModuleConfig + + +def test_wan_mock_datamodule_build_and_batch_shapes(): + cfg = WanMockDataModuleConfig( + path="", + seq_length=128, + packing_buffer_size=2, + micro_batch_size=2, + global_batch_size=8, + num_workers=0, + # Use small shapes for a light-weight test run + F_latents=4, + H_latents=8, + W_latents=6, + patch_spatial=2, + patch_temporal=1, + number_packed_samples=2, + context_seq_len=16, + context_embeddings_dim=64, + ) + train_dl, val_dl, test_dl = cfg.build_datasets(_context=None) + assert isinstance(train_dl, DataLoader) + assert train_dl is val_dl and val_dl is test_dl + + batch = next(iter(train_dl)) + expected_keys = { + "video_latents", + "context_embeddings", + "loss_mask", + "seq_len_q", + "seq_len_q_padded", + "seq_len_kv", + "seq_len_kv_padded", + "grid_sizes", + "video_metadata", + } + assert expected_keys.issubset(set(batch.keys())) + + # Basic sanity checks on shapes/dtypes + assert batch["video_latents"].dim() == 3 and batch["video_latents"].shape[1] == 1 + assert batch["context_embeddings"].dim() == 3 and batch["context_embeddings"].shape[1] == 1 + assert batch["loss_mask"].dim() == 2 and batch["loss_mask"].shape[1] == 1 + assert batch["seq_len_q"].dtype == torch.int32 + assert batch["seq_len_q_padded"].dtype == torch.int32 + assert batch["seq_len_kv"].dtype == torch.int32 + assert batch["seq_len_kv_padded"].dtype == torch.int32 + assert batch["grid_sizes"].dtype == torch.int32 diff --git a/tests/unit_tests/megatron/data/wan/test_wan_taskencoder.py b/tests/unit_tests/megatron/data/wan/test_wan_taskencoder.py new file mode 100644 index 00000000..e739a1ec --- /dev/null +++ b/tests/unit_tests/megatron/data/wan/test_wan_taskencoder.py @@ -0,0 +1,154 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch + +from dfm.src.megatron.data.wan.wan_taskencoder import WanTaskEncoder, cook, parallel_state + + +def test_cook_extracts_expected_fields(): + sample = { + "__key__": "k", + "__restore_key__": "rk", + "__subflavors__": [], + "json": {"meta": 1}, + "pth": torch.randn(1, 2, 2, 2), + "pickle": torch.randn(3, 4), + "unused": 123, + } + out = cook(sample) + assert "json" in out and out["json"] is sample["json"] + assert "pth" in out and torch.equal(out["pth"], sample["pth"]) + assert "pickle" in out and torch.equal(out["pickle"], sample["pickle"]) + # ensure basic keys from the sample are preserved by cook via basic_sample_keys() + assert out["__key__"] == sample["__key__"] + assert out["__restore_key__"] == sample["__restore_key__"] + assert out["__subflavors__"] == sample["__subflavors__"] + + +def test_encode_sample_no_context_parallel(monkeypatch): + # Ensure CP world size is 1 to avoid extra padding branch + monkeypatch.setattr(parallel_state, "get_context_parallel_world_size", lambda: 1, raising=False) + # Ensure seeded wrapper has an active worker config + from megatron.energon.task_encoder.base import WorkerConfig + + class _FakeWorkerCfg: + def worker_seed(self): + return 123 + + active_worker_sample_index = 0 + + monkeypatch.setattr(WorkerConfig, "active_worker_config", _FakeWorkerCfg(), raising=False) + + # Construct a minimal, consistent sample + c = 8 + F_latents, H_latents, W_latents = 4, 8, 6 + patch_temporal, patch_spatial = 1, 2 + # video latent before patchify has shape [c, F_latents, H_latents, W_latents] + # where grid sizes (patch counts) are (F_latents // pF, H_latents // pH, W_latents // pW) + video_latent = torch.randn(c, F_latents, H_latents, W_latents) + context_len, context_dim = 256, 64 + context_embeddings = torch.randn(context_len, context_dim) + sample = { + "__key__": "k", + "__restore_key__": "rk", + "__subflavors__": [], + "json": {"meta": 1}, + "pth": video_latent, + "pickle": context_embeddings, + } + + enc = WanTaskEncoder( + seq_length=1024, patch_temporal=patch_temporal, patch_spatial=patch_spatial, packing_buffer_size=None + ) + out = enc.encode_sample(sample) + + # Grid / patches + F_patches = F_latents // patch_temporal + H_patches = H_latents // patch_spatial + W_patches = W_latents // patch_spatial + num_patches = F_patches * H_patches * W_patches + patch_vec_dim = c * patch_temporal * patch_spatial * patch_spatial + + assert out.video.shape == (num_patches, patch_vec_dim) + assert out.latent_shape.dtype == torch.int32 + assert torch.equal(out.latent_shape, torch.tensor([F_patches, H_patches, W_patches], dtype=torch.int32)) + + # Loss mask and seq lengths + assert out.loss_mask.dtype == torch.bfloat16 + assert out.loss_mask.shape[0] == num_patches + assert torch.equal(out.seq_len_q, torch.tensor([num_patches], dtype=torch.int32)) + # context embeddings are padded to fixed 512 inside encode_sample + assert torch.equal(out.seq_len_kv, torch.tensor([512], dtype=torch.int32)) + assert torch.equal(out.seq_len_q_padded, out.seq_len_q) + assert torch.equal(out.seq_len_kv_padded, out.seq_len_kv) + + # Metadata passthrough + assert out.video_metadata == sample["json"] + assert out.__key__ == sample["__key__"] + assert out.__restore_key__ == sample["__restore_key__"] + assert out.__subflavors__ == sample["__subflavors__"] + + +def test_batch_with_packing_buffer_size(monkeypatch): + # Force CP world size 1 + monkeypatch.setattr(parallel_state, "get_context_parallel_world_size", lambda: 1, raising=False) + # Ensure seeded wrapper has an active worker config + from megatron.energon.task_encoder.base import WorkerConfig + + class _FakeWorkerCfg: + def worker_seed(self): + return 456 + + active_worker_sample_index = 0 + + monkeypatch.setattr(WorkerConfig, "active_worker_config", _FakeWorkerCfg(), raising=False) + + c = 4 + F_latents, H_latents, W_latents = 2, 4, 4 + patch_temporal, patch_spatial = 1, 2 + video_latent = torch.randn(c, F_latents * patch_temporal, H_latents * patch_spatial, W_latents * patch_spatial) + sample = { + "__key__": "k", + "__restore_key__": "rk", + "__subflavors__": [], + "json": {"meta": 1}, + "pth": video_latent, + "pickle": torch.randn(32, 128), + } + + enc = WanTaskEncoder( + seq_length=256, patch_temporal=patch_temporal, patch_spatial=patch_spatial, packing_buffer_size=3 + ) + diff_sample = enc.encode_sample(sample) + batch = enc.batch([diff_sample]) + + assert isinstance(batch, dict) + for k in [ + "video_latents", + "context_embeddings", + "loss_mask", + "seq_len_q", + "seq_len_q_padded", + "seq_len_kv", + "seq_len_kv_padded", + "grid_sizes", + "video_metadata", + ]: + assert k in batch + + # video_latents: [S, 1, ...], where S equals sample.video length when CP world size is 1 + assert batch["video_latents"].shape[1] == 1 + assert batch["context_embeddings"].shape[1] == 1 + assert batch["loss_mask"].shape[1] == 1 diff --git a/tests/unit_tests/megatron/model/wan/flow_matching/test_flow_inference_pipeline.py b/tests/unit_tests/megatron/model/wan/flow_matching/test_flow_inference_pipeline.py new file mode 100644 index 00000000..95e2cfca --- /dev/null +++ b/tests/unit_tests/megatron/model/wan/flow_matching/test_flow_inference_pipeline.py @@ -0,0 +1,90 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import pytest +import torch + +from dfm.src.megatron.model.wan.flow_matching.flow_inference_pipeline import FlowInferencePipeline + + +def test_select_checkpoint_dir_latest(tmp_path): + base = tmp_path / "ckpts" + os.makedirs(base / "iter_0000100") + os.makedirs(base / "iter_0000200") + + # Minimal inference config object + class _Cfg: + num_train_timesteps = 1000 + param_dtype = torch.float32 + text_len = 512 + t5_dtype = torch.float32 + vae_stride = (1, 1, 1) + patch_size = (1, 1, 1) + + # Instantiate object without running heavy init by patching __init__ to a no-op + pip = object.__new__(FlowInferencePipeline) + + pip.inference_cfg = _Cfg() + + latest = FlowInferencePipeline._select_checkpoint_dir(pip, str(base), checkpoint_step=None) + assert latest.endswith("iter_0000200") + + specific = FlowInferencePipeline._select_checkpoint_dir(pip, str(base), checkpoint_step=100) + assert specific.endswith("iter_0000100") + + with pytest.raises(FileNotFoundError): + FlowInferencePipeline._select_checkpoint_dir(pip, str(base), checkpoint_step=999) + + +def test_forward_pp_step_no_pp(monkeypatch): + # Build a minimal instance skipping heavy init + pip = object.__new__(FlowInferencePipeline) + + class _Model: + class _Cfg: + hidden_size = 16 + qkv_format = "sbhd" + + config = _Cfg() + + def __call__(self, x, grid_sizes, t, **kwargs): + return x # echo input + + def set_input_tensor(self, x): + pass + + pip.model = _Model() + + # Patch parallel state to no-PP path + from megatron.core import parallel_state + + monkeypatch.setattr(parallel_state, "get_pipeline_model_parallel_world_size", lambda: 1, raising=False) + + S, B, H = 8, 1, pip.model.config.hidden_size + latent_model_input = torch.randn(S, B, H, dtype=torch.float32) + grid_sizes = [(2, 2, 2)] + timestep = torch.tensor([10.0], dtype=torch.float32) + arg_c = {} + + out = FlowInferencePipeline.forward_pp_step( + pip, + latent_model_input=latent_model_input, + grid_sizes=grid_sizes, + max_video_seq_len=S, + timestep=timestep, + arg_c=arg_c, + ) + assert out.shape == latent_model_input.shape diff --git a/tests/unit_tests/megatron/model/wan/flow_matching/test_flow_pipeline.py b/tests/unit_tests/megatron/model/wan/flow_matching/test_flow_pipeline.py new file mode 100644 index 00000000..d93a4298 --- /dev/null +++ b/tests/unit_tests/megatron/model/wan/flow_matching/test_flow_pipeline.py @@ -0,0 +1,124 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import types + +import torch + +from dfm.src.megatron.model.wan.flow_matching import flow_pipeline as flow_pipeline_mod +from dfm.src.megatron.model.wan.flow_matching.flow_pipeline import FlowPipeline + + +class _DummyModel: + class _Cfg: + in_channels = 2 + patch_spatial = 1 + patch_temporal = 1 + + def __init__(self): + self.config = self._Cfg() + + def __call__(self, x, grid_sizes, t, context, packed_seq_params): + # Return zeros matching input shape (seq_len, 1, latent_dim) + return torch.zeros_like(x) + + +def test_flow_pipeline_training_step_cpu_stub(monkeypatch): + # Bypass heavy diffusers init + def _stub_init(self, model_id="x", seed=0): + self.pipe = types.SimpleNamespace( + scheduler=types.SimpleNamespace(config=types.SimpleNamespace(num_train_timesteps=1000)) + ) + + monkeypatch.setattr(FlowPipeline, "__init__", _stub_init) + + # Make patchify accept both tensor and list for this test + def _safe_patchify(x, patch_size): + # Always delegate to the real implementation in utils to avoid recursion + from dfm.src.megatron.model.wan import utils as wan_utils + + impl = wan_utils.patchify + # Normalize inputs to expected 4D [C, F, H, W] without batch dim + if isinstance(x, list): + x_norm = [] + for t in x: + if isinstance(t, torch.Tensor) and t.dim() == 5 and t.size(0) == 1: + x_norm.append(t.squeeze(0)) + else: + x_norm.append(t) + else: + t = x + if isinstance(t, torch.Tensor) and t.dim() == 5 and t.size(0) == 1: + t = t.squeeze(0) + x_norm = [t] + return impl(x_norm, patch_size) + + monkeypatch.setattr(flow_pipeline_mod, "patchify", _safe_patchify) + + # Disable context parallelism and force last pipeline stage + from megatron.core import parallel_state + + monkeypatch.setattr(parallel_state, "get_context_parallel_world_size", lambda: 1, raising=False) + monkeypatch.setattr(parallel_state, "is_pipeline_last_stage", lambda: True, raising=False) + + pipe = FlowPipeline() + model = _DummyModel() + + # Build a minimal, consistent batch: seq_len = F*H*W = 2*2*2 = 8, latent_dim = in_channels * pF * pH * pW = 2 + F, H, W = 2, 2, 2 + seq_len = F * H * W + latent_dim = model.config.in_channels + + video_latents = torch.randn(seq_len, 1, latent_dim, dtype=torch.float32) + context_embeddings = torch.randn(4, 1, 8, dtype=torch.float32) + loss_mask = torch.ones(seq_len, dtype=torch.bfloat16) + grid_sizes = torch.tensor([[F, H, W]], dtype=torch.int32) + + # Packed seq params with simple cumulative lengths + from megatron.core.packed_seq_params import PackedSeqParams + + cu = torch.tensor([0, seq_len], dtype=torch.int32) + packed_seq_params = { + "self_attention": PackedSeqParams( + cu_seqlens_q=cu, cu_seqlens_q_padded=cu, cu_seqlens_kv=cu, cu_seqlens_kv_padded=cu, qkv_format="sbhd" + ), + "cross_attention": PackedSeqParams( + cu_seqlens_q=cu, cu_seqlens_q_padded=cu, cu_seqlens_kv=cu, cu_seqlens_kv_padded=cu, qkv_format="sbhd" + ), + } + + batch = { + "video_latents": video_latents, + "context_embeddings": context_embeddings, + "loss_mask": loss_mask, + "grid_sizes": grid_sizes, + "packed_seq_params": packed_seq_params, + "video_metadata": {}, + } + + model_pred, weighted_loss, split_loss_mask = pipe.training_step( + model, + batch, + use_sigma_noise=True, + timestep_sampling="uniform", + flow_shift=3.0, + mix_uniform_ratio=1.0, # force uniform branch + sigma_min=0.0, + sigma_max=1.0, + ) + + # Basic shape checks + assert model_pred.shape == video_latents.shape + assert weighted_loss.shape[:2] == video_latents.shape[:2] + assert split_loss_mask.shape == loss_mask.shape diff --git a/tests/unit_tests/megatron/model/wan/flow_matching/test_time_shift_utils.py b/tests/unit_tests/megatron/model/wan/flow_matching/test_time_shift_utils.py new file mode 100644 index 00000000..b239584c --- /dev/null +++ b/tests/unit_tests/megatron/model/wan/flow_matching/test_time_shift_utils.py @@ -0,0 +1,66 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch + +from dfm.src.megatron.model.wan.flow_matching.time_shift_utils import ( + compute_density_for_timestep_sampling, + get_flow_match_loss_weight, + time_shift, +) + + +def test_time_shift_constant_linear_sqrt_bounds_and_monotonic(): + t_small = torch.tensor(0.1, dtype=torch.float32) + t_large = torch.tensor(0.9, dtype=torch.float32) + seq_len = 512 + + # constant + s_small = time_shift(t_small, image_seq_len=seq_len, shift_type="constant", constant=3.0) + s_large = time_shift(t_large, image_seq_len=seq_len, shift_type="constant", constant=3.0) + assert 0.0 <= s_small.item() <= 1.0 + assert 0.0 <= s_large.item() <= 1.0 + assert s_large > s_small + + # linear + s_small = time_shift(t_small, image_seq_len=seq_len, shift_type="linear", base_shift=0.5, max_shift=1.15) + s_large = time_shift(t_large, image_seq_len=seq_len, shift_type="linear", base_shift=0.5, max_shift=1.15) + assert 0.0 <= s_small.item() <= 1.0 + assert 0.0 <= s_large.item() <= 1.0 + assert s_large > s_small + + # sqrt + s_small = time_shift(t_small, image_seq_len=seq_len, shift_type="sqrt") + s_large = time_shift(t_large, image_seq_len=seq_len, shift_type="sqrt") + assert 0.0 <= s_small.item() <= 1.0 + assert 0.0 <= s_large.item() <= 1.0 + assert s_large > s_small + + +def test_compute_density_for_timestep_sampling_modes_and_ranges(): + batch_size = 16 + for mode in ["uniform", "logit_normal", "mode"]: + u = compute_density_for_timestep_sampling(mode, batch_size=batch_size, logit_mean=0.0, logit_std=1.0) + assert u.shape == (batch_size,) + assert torch.all((0.0 <= u) & (u <= 1.0)) + + +def test_get_flow_match_loss_weight_simple_cases(): + sigma = torch.zeros(5, dtype=torch.float32) + w = get_flow_match_loss_weight(sigma, shift=3.0) + assert torch.allclose(w, torch.ones_like(w)) + + sigma = torch.ones(5, dtype=torch.float32) + w = get_flow_match_loss_weight(sigma, shift=2.0) + assert torch.allclose(w, torch.full_like(sigma, 3.0)) diff --git a/tests/unit_tests/megatron/model/wan/inference/test_inference_init.py b/tests/unit_tests/megatron/model/wan/inference/test_inference_init.py new file mode 100644 index 00000000..f8005047 --- /dev/null +++ b/tests/unit_tests/megatron/model/wan/inference/test_inference_init.py @@ -0,0 +1,40 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dfm.src.megatron.model.wan.inference import MAX_AREA_CONFIGS, SIZE_CONFIGS, SUPPORTED_SIZES + + +def test_size_configs_structure_and_values(): + assert isinstance(SIZE_CONFIGS, dict) + for key, val in SIZE_CONFIGS.items(): + assert isinstance(key, str) + assert isinstance(val, tuple) and len(val) == 2 + w, h = val + assert isinstance(w, int) and isinstance(h, int) + assert w > 0 and h > 0 + + +def test_max_area_configs_consistency(): + for size_key, area in MAX_AREA_CONFIGS.items(): + w, h = SIZE_CONFIGS[size_key] + assert area == w * h + + +def test_supported_sizes_lists(): + assert "t2v-14B" in SUPPORTED_SIZES + assert "t2v-1.3B" in SUPPORTED_SIZES + for model_key, sizes in SUPPORTED_SIZES.items(): + assert isinstance(sizes, tuple) + for s in sizes: + assert s in SIZE_CONFIGS diff --git a/tests/unit_tests/megatron/model/wan/inference/test_inference_utils.py b/tests/unit_tests/megatron/model/wan/inference/test_inference_utils.py new file mode 100644 index 00000000..8445fddd --- /dev/null +++ b/tests/unit_tests/megatron/model/wan/inference/test_inference_utils.py @@ -0,0 +1,84 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +import tempfile + +import torch + +from dfm.src.megatron.model.wan.inference import utils as inf_utils + + +def test_str2bool_variants_and_errors(): + true_vals = ["yes", "true", "t", "y", "1", "TRUE", "Yes"] + false_vals = ["no", "false", "f", "n", "0", "FALSE", "No"] + for v in true_vals: + assert inf_utils.str2bool(v) is True + for v in false_vals: + assert inf_utils.str2bool(v) is False + assert inf_utils.str2bool(True) is True + assert inf_utils.str2bool(False) is False + try: + inf_utils.str2bool("maybe") + except argparse.ArgumentTypeError: + pass + else: + assert False, "Expected argparse.ArgumentTypeError for invalid boolean string" + + +def test_cache_image_writes_file(tmp_path): + # Small 3x8x8 image + img = torch.rand(3, 8, 8) + out_path = tmp_path / "test.png" + saved = inf_utils.cache_image(img, str(out_path), nrow=1, normalize=False, value_range=(0.0, 1.0), retry=1) + assert saved == str(out_path) + assert os.path.exists(out_path) + assert os.path.getsize(out_path) > 0 + + +def test_cache_video_uses_writer_and_returns_path(monkeypatch): + # Stub imageio.get_writer to avoid codec dependency + calls = {"frames": 0, "path": None} + + class _DummyWriter: + def __init__(self, path, fps=None, codec=None, quality=None): + calls["path"] = path + + def append_data(self, frame): + calls["frames"] += 1 + + def close(self): + pass + + monkeypatch.setattr( + inf_utils.imageio, "get_writer", lambda path, fps, codec, quality: _DummyWriter(path, fps, codec, quality) + ) + + # Stub make_grid to return a fixed CHW tensor regardless of input + def _fake_make_grid(x, nrow, normalize, value_range): + return torch.rand(3, 4, 5) + + monkeypatch.setattr(inf_utils.torchvision.utils, "make_grid", _fake_make_grid) + + # Build a tensor whose unbind(2) yields 2 slices so we expect 2 frames written + vid = torch.rand(3, 3, 2, 2) # shape chosen to exercise unbind(2) + with tempfile.TemporaryDirectory() as td: + out_file = os.path.join(td, "out.mp4") + result = inf_utils.cache_video( + vid, save_file=out_file, fps=5, suffix=".mp4", nrow=1, normalize=False, value_range=(0.0, 1.0), retry=1 + ) + assert result == out_file + assert calls["path"] == out_file + assert calls["frames"] == vid.shape[2] # frames equal to number of unbinds on dim=2 diff --git a/tests/unit_tests/megatron/model/wan/test_rope_utils.py b/tests/unit_tests/megatron/model/wan/test_rope_utils.py new file mode 100644 index 00000000..7e31d8d0 --- /dev/null +++ b/tests/unit_tests/megatron/model/wan/test_rope_utils.py @@ -0,0 +1,49 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch + +from dfm.src.megatron.model.wan.rope_utils import Wan3DRopeEmbeddings + + +def test_wan3d_rope_embeddings_shapes_and_padding(): + # Small, CPU-friendly config + n_head = 2 + dim_head = 8 # must be divisible with the internal splits + max_position_len = 16 + rope = Wan3DRopeEmbeddings(dim_head=dim_head, max_position_len=max_position_len) + + # Two samples with different (f, h, w) + grid_sizes = torch.tensor([[2, 3, 2], [4, 1, 1]], dtype=torch.int32) + seq_lens = [(2 * 3 * 2), (4 * 1 * 1)] + padded_lens = [seq_lens[0] + 2, seq_lens[1]] # pad first sample + + cu_seqlens_q_padded = torch.tensor([0, padded_lens[0], padded_lens[0] + padded_lens[1]], dtype=torch.int32) + + out = rope( + n_head=n_head, + dim_head=dim_head, + cu_seqlens_q_padded=cu_seqlens_q_padded, + grid_sizes=grid_sizes, + device=torch.device("cpu"), + ) + + # Total concatenated length equals sum of padded lens + assert out.shape == (sum(padded_lens), 1, 1, dim_head) + + # Check that padding region for the first sample is zero + first_seq_len = seq_lens[0] + first_padded_len = padded_lens[0] + tail = out[first_seq_len:first_padded_len] + assert torch.all(tail == 0), "Padded region should be zeros" diff --git a/tests/unit_tests/megatron/model/wan/test_utils.py b/tests/unit_tests/megatron/model/wan/test_utils.py new file mode 100644 index 00000000..3f89b4cd --- /dev/null +++ b/tests/unit_tests/megatron/model/wan/test_utils.py @@ -0,0 +1,48 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch + +from dfm.src.megatron.model.wan.utils import grid_sizes_calculation, patchify, unpatchify + + +def test_grid_sizes_calculation_basic(): + input_shape = (4, 8, 6) + patch_size = (1, 2, 3) + f, h, w = grid_sizes_calculation(input_shape, patch_size) + assert (f, h, w) == (4, 4, 2) + + +def test_patchify_unpatchify_roundtrip(): + # Video latent: [c, F_patches * pF, H_patches * pH, W_patches * pW] + c = 3 + F_patches, H_patches, W_patches = 2, 2, 3 + patch_size = (1, 2, 2) + F_latents = F_patches * patch_size[0] + H_latents = H_patches * patch_size[1] + W_latents = W_patches * patch_size[2] + + x = [torch.randn(c, F_latents, H_latents, W_latents)] + + patches = patchify(x, patch_size) + assert isinstance(patches, list) and len(patches) == 1 + seq_len, dim = patches[0].shape + assert seq_len == F_patches * H_patches * W_patches + assert dim == c * (patch_size[0] * patch_size[1] * patch_size[2]) + + # Unpatchify and compare + y = unpatchify(patches, [[F_patches, H_patches, W_patches]], out_dim=c, patch_size=patch_size) + assert isinstance(y, list) and len(y) == 1 + assert y[0].shape == x[0].shape + torch.testing.assert_close(y[0], x[0], rtol=1e-5, atol=1e-5) diff --git a/tests/unit_tests/megatron/model/wan/test_wan_layer_spec.py b/tests/unit_tests/megatron/model/wan/test_wan_layer_spec.py new file mode 100644 index 00000000..21ee570e --- /dev/null +++ b/tests/unit_tests/megatron/model/wan/test_wan_layer_spec.py @@ -0,0 +1,26 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dfm.src.megatron.model.wan.wan_layer_spec import get_wan_block_with_transformer_engine_spec + + +def test_get_wan_block_with_transformer_engine_spec_basic(): + spec = get_wan_block_with_transformer_engine_spec() + # Basic structure checks + assert hasattr(spec, "module") + assert hasattr(spec, "submodules") + sub = spec.submodules + # Expected submodule fields exist + for name in ["norm1", "norm2", "norm3", "full_self_attention", "cross_attention", "mlp"]: + assert hasattr(sub, name), f"Missing submodule {name}" diff --git a/tests/functional_tests/L2_Functional_Tests_GPU.sh b/tests/unit_tests/megatron/model/wan/test_wan_model_misc.py similarity index 64% rename from tests/functional_tests/L2_Functional_Tests_GPU.sh rename to tests/unit_tests/megatron/model/wan/test_wan_model_misc.py index ae77eb3f..de141def 100644 --- a/tests/functional_tests/L2_Functional_Tests_GPU.sh +++ b/tests/unit_tests/megatron/model/wan/test_wan_model_misc.py @@ -11,4 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -CUDA_VISIBLE_DEVICES="0,1" uv run coverage run -a --data-file=/opt/DFM/.coverage --source=/opt/DFM/ -m pytest tests/unit_tests -m "not pleasefixme" --with_downloads + +import torch + +from dfm.src.megatron.model.wan.wan_model import sinusoidal_embedding_1d + + +def test_sinusoidal_embedding_1d_shape_and_dtype(): + dim = 16 + pos = torch.arange(10, dtype=torch.float32) + emb = sinusoidal_embedding_1d(dim, pos) + assert emb.shape == (pos.shape[0], dim) + assert emb.dtype == torch.float32 diff --git a/tests/unit_tests/megatron/model/wan/test_wan_provider.py b/tests/unit_tests/megatron/model/wan/test_wan_provider.py new file mode 100644 index 00000000..78541900 --- /dev/null +++ b/tests/unit_tests/megatron/model/wan/test_wan_provider.py @@ -0,0 +1,84 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import torch.nn as nn +from megatron.core import parallel_state + +import dfm.src.megatron.model.wan.wan_model as wan_model_module +from dfm.src.megatron.model.wan.wan_model import WanModel +from dfm.src.megatron.model.wan.wan_provider import WanModelProvider + + +def test_wan_model_provider_provide_returns_model(monkeypatch): + # Force pipeline stage booleans to avoid dependency on initialized model parallel + monkeypatch.setattr(parallel_state, "is_pipeline_first_stage", lambda: True, raising=False) + monkeypatch.setattr(parallel_state, "is_pipeline_last_stage", lambda: True, raising=False) + # Avoid querying uninitialized PP groups + monkeypatch.setattr(parallel_state, "get_pipeline_model_parallel_world_size", lambda: 1, raising=False) + + # Bypass Megatron's ProcessGroupCollection usage inside TransformerBlock during construction. + # CI does not initialize distributed groups; a dummy block suffices for construction checks. + class DummyTransformerBlock(nn.Module): + def __init__(self, *args, **kwargs): + super().__init__() + self.input_tensor = None + + def set_input_tensor(self, input_tensor): + self.input_tensor = input_tensor + + def forward(self, hidden_states, **kwargs): + return hidden_states + + monkeypatch.setattr(wan_model_module, "TransformerBlock", DummyTransformerBlock, raising=False) + + provider = WanModelProvider( + num_layers=2, # keep small + hidden_size=64, + ffn_hidden_size=128, + num_attention_heads=4, + layernorm_epsilon=1e-6, + normalization="RMSNorm", + layernorm_zero_centered_gamma=False, + layernorm_across_heads=True, + add_qkv_bias=True, + rotary_interleaved=True, + hidden_dropout=0.0, + attention_dropout=0.0, + fp16_lm_cross_entropy=False, + parallel_output=True, + bf16=False, + params_dtype=torch.float32, + qkv_format="sbhd", + seq_length=128, + share_embeddings_and_output_weights=False, + vocab_size=32000, + make_vocab_size_divisible_by=128, + in_channels=4, + out_channels=4, + patch_spatial=2, + patch_temporal=1, + freq_dim=16, + text_len=32, + text_dim=64, + ) + # Ensure config supplies fields expected by core attention + provider.kv_channels = provider.hidden_size // provider.num_attention_heads + provider.num_query_groups = provider.num_attention_heads + model = provider.provide() + assert isinstance(model, WanModel) + # Sanity check key config properties were plumbed + assert model.config.hidden_size == 64 + assert model.config.num_attention_heads == 4 + assert model.config.text_dim == 64 diff --git a/tests/unit_tests/megatron/model/wan/test_wan_step.py b/tests/unit_tests/megatron/model/wan/test_wan_step.py new file mode 100644 index 00000000..8ee0e9cb --- /dev/null +++ b/tests/unit_tests/megatron/model/wan/test_wan_step.py @@ -0,0 +1,62 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import torch + +from dfm.src.megatron.model.wan.wan_step import WanForwardStep, wan_data_step + + +class _DummyIter: + def __init__(self, batch): + # mimic attribute used inside wan_data_step + self.iterable = [batch] + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="wan_data_step moves tensors to CUDA") +def test_wan_data_step_builds_packed_seq_params_cuda_guarded(): + # Construct minimal batch with required seq_len fields + batch = { + "seq_len_q": torch.tensor([3, 5], dtype=torch.int32), + "seq_len_q_padded": torch.tensor([4, 6], dtype=torch.int32), + "seq_len_kv": torch.tensor([2, 7], dtype=torch.int32), + "seq_len_kv_padded": torch.tensor([2, 8], dtype=torch.int32), + # include a tensor field to exercise device transfer + "video_latents": torch.randn(8, 1, 4, dtype=torch.float32), + } + it = _DummyIter(batch) + qkv_format = "sbhd" + out = wan_data_step(qkv_format, it) + + assert "packed_seq_params" in out + for k in ["self_attention", "cross_attention"]: + assert k in out["packed_seq_params"] + p = out["packed_seq_params"][k] + assert hasattr(p, "cu_seqlens_q") + assert hasattr(p, "cu_seqlens_q_padded") + assert hasattr(p, "cu_seqlens_kv") + assert hasattr(p, "cu_seqlens_kv_padded") + # spot-check CUDA device after move + assert out["video_latents"].is_cuda + + +def test_wan_forward_step_loss_partial_creation(): + step = WanForwardStep() + mask = torch.ones(4, dtype=torch.float32) + loss_fn = step._create_loss_function(mask, check_for_nan_in_loss=False, check_for_spiky_loss=False) + # Just validate it's callable and is a functools.partial + import functools + + assert isinstance(loss_fn, functools.partial) + assert callable(loss_fn)