diff --git a/tests/test_highresnet.py b/tests/test_highresnet.py index 1e6c973b2d..6a4b129588 100644 --- a/tests/test_highresnet.py +++ b/tests/test_highresnet.py @@ -15,7 +15,7 @@ from parameterized import parameterized from monai.networks.nets import HighResNet -from tests.utils import test_script_save +from tests.utils import DistTestCase, TimedCall, test_script_save device = "cuda" if torch.cuda.is_available() else "cpu" @@ -44,7 +44,7 @@ ] -class TestHighResNet(unittest.TestCase): +class TestHighResNet(DistTestCase): @parameterized.expand([TEST_CASE_1, TEST_CASE_2, TEST_CASE_3, TEST_CASE_4]) def test_shape(self, input_param, input_shape, expected_shape): net = HighResNet(**input_param).to(device) @@ -53,6 +53,7 @@ def test_shape(self, input_param, input_shape, expected_shape): result = net.forward(torch.randn(input_shape).to(device)) self.assertEqual(result.shape, expected_shape) + @TimedCall(seconds=100, force_quit=True) def test_script(self): input_param, input_shape, expected_shape = TEST_CASE_1 net = HighResNet(**input_param) diff --git a/tests/test_integration_classification_2d.py b/tests/test_integration_classification_2d.py index ba08a40b56..b9a458ac86 100644 --- a/tests/test_integration_classification_2d.py +++ b/tests/test_integration_classification_2d.py @@ -25,7 +25,7 @@ from monai.transforms import AddChannel, Compose, LoadPNG, RandFlip, RandRotate, RandZoom, ScaleIntensity, ToTensor from monai.utils import set_determinism from tests.testing_data.integration_answers import test_integration_value -from tests.utils import skip_if_quick +from tests.utils import DistTestCase, TimedCall, skip_if_quick TEST_DATA_URL = "https://www.dropbox.com/s/5wwskxctvcxiuea/MedNIST.tar.gz?dl=1" MD5_VALUE = "0bc7306e7427e00ad1c5526a6677552d" @@ -45,7 +45,7 @@ def __getitem__(self, index): return self.transforms(self.image_files[index]), self.labels[index] -def run_training_test(root_dir, train_x, train_y, val_x, val_y, device="cuda:0"): +def run_training_test(root_dir, train_x, train_y, val_x, val_y, device="cuda:0", num_workers=10): monai.config.print_config() # define transforms for image and classification @@ -65,10 +65,10 @@ def run_training_test(root_dir, train_x, train_y, val_x, val_y, device="cuda:0") # create train, val data loaders train_ds = MedNISTDataset(train_x, train_y, train_transforms) - train_loader = DataLoader(train_ds, batch_size=300, shuffle=True, num_workers=10) + train_loader = DataLoader(train_ds, batch_size=300, shuffle=True, num_workers=num_workers) val_ds = MedNISTDataset(val_x, val_y, val_transforms) - val_loader = DataLoader(val_ds, batch_size=300, num_workers=10) + val_loader = DataLoader(val_ds, batch_size=300, num_workers=num_workers) model = densenet121(spatial_dims=2, in_channels=1, out_channels=len(np.unique(train_y))).to(device) loss_function = torch.nn.CrossEntropyLoss() @@ -127,11 +127,11 @@ def run_training_test(root_dir, train_x, train_y, val_x, val_y, device="cuda:0") return epoch_loss_values, best_metric, best_metric_epoch -def run_inference_test(root_dir, test_x, test_y, device="cuda:0"): +def run_inference_test(root_dir, test_x, test_y, device="cuda:0", num_workers=10): # define transforms for image and classification val_transforms = Compose([LoadPNG(image_only=True), AddChannel(), ScaleIntensity(), ToTensor()]) val_ds = MedNISTDataset(test_x, test_y, val_transforms) - val_loader = DataLoader(val_ds, batch_size=300, num_workers=10) + val_loader = DataLoader(val_ds, batch_size=300, num_workers=num_workers) model = densenet121(spatial_dims=2, in_channels=1, out_channels=len(np.unique(test_y))).to(device) @@ -152,7 +152,7 @@ def run_inference_test(root_dir, test_x, test_y, device="cuda:0"): @skip_if_quick -class IntegrationClassification2D(unittest.TestCase): +class IntegrationClassification2D(DistTestCase): def setUp(self): set_determinism(seed=0) self.data_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "testing_data") @@ -198,7 +198,7 @@ def setUp(self): self.train_x.append(image_file_list[i]) self.train_y.append(image_classes[i]) - self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") + self.device = "cuda:0" if torch.cuda.is_available() else "cpu:0" def tearDown(self): set_determinism(seed=None) @@ -208,38 +208,45 @@ def tearDown(self): warnings.warn("not found best_metric_model.pth, training skipped?") pass - def test_training(self): + def train_and_infer(self, idx=0): + results = [] if not os.path.exists(os.path.join(self.data_dir, "MedNIST")): # skip test if no MedNIST dataset - return + return results + + set_determinism(seed=0) + losses, best_metric, best_metric_epoch = run_training_test( + self.data_dir, self.train_x, self.train_y, self.val_x, self.val_y, device=self.device + ) + infer_metric = run_inference_test(self.data_dir, self.test_x, self.test_y, device=self.device) + + print(f"integration_classification_2d {losses}") + print("best metric", best_metric) + print("infer metric", infer_metric) + # check training properties + self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-2)) + self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-4)) + np.testing.assert_allclose(best_metric_epoch, 4) + model_file = os.path.join(self.data_dir, "best_metric_model.pth") + self.assertTrue(os.path.exists(model_file)) + # check inference properties + self.assertTrue(test_integration_value(TASK, key="infer_prop", data=np.asarray(infer_metric), rtol=1)) + results.extend(losses) + results.append(best_metric) + results.extend(infer_metric) + return results + + def test_training(self): repeated = [] for i in range(2): - set_determinism(seed=0) - - repeated.append([]) - losses, best_metric, best_metric_epoch = run_training_test( - self.data_dir, self.train_x, self.train_y, self.val_x, self.val_y, device=self.device - ) - - # check training properties - print(f"integration_classification_2d {losses}") - self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-2)) - repeated[i].extend(losses) - print("best metric", best_metric) - self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-4)) - repeated[i].append(best_metric) - np.testing.assert_allclose(best_metric_epoch, 4) - model_file = os.path.join(self.data_dir, "best_metric_model.pth") - self.assertTrue(os.path.exists(model_file)) - - infer_metric = run_inference_test(self.data_dir, self.test_x, self.test_y, device=self.device) - print("infer metric", infer_metric) - # check inference properties - self.assertTrue(test_integration_value(TASK, key="infer_prop", data=np.asarray(infer_metric), rtol=1)) - repeated[i].extend(infer_metric) - + results = self.train_and_infer(i) + repeated.append(results) np.testing.assert_allclose(repeated[0], repeated[1]) + @TimedCall(seconds=500, skip_timing=not torch.cuda.is_available(), daemon=False) + def test_timing(self): + self.train_and_infer() + if __name__ == "__main__": unittest.main() diff --git a/tests/test_integration_determinism.py b/tests/test_integration_determinism.py index 552fe11394..dbabc96da1 100644 --- a/tests/test_integration_determinism.py +++ b/tests/test_integration_determinism.py @@ -20,6 +20,7 @@ from monai.networks.nets import UNet from monai.transforms import AddChannel, Compose, RandRotate90, RandSpatialCrop, ScaleIntensity, ToTensor from monai.utils import set_determinism +from tests.utils import DistTestCase, TimedCall def run_test(batch_size=64, train_steps=200, device="cuda:0"): @@ -67,15 +68,16 @@ def __len__(self): return epoch_loss, step -class TestDeterminism(unittest.TestCase): +class TestDeterminism(DistTestCase): def setUp(self): - set_determinism(seed=0) self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") def tearDown(self): set_determinism(seed=None) + @TimedCall(seconds=30) def test_training(self): + set_determinism(seed=0) loss, step = run_test(device=self.device) print(f"Deterministic loss {loss} at training step {step}") np.testing.assert_allclose(step, 4) diff --git a/tests/test_integration_segmentation_3d.py b/tests/test_integration_segmentation_3d.py index 2a7cc0de6d..cfeefc3f46 100644 --- a/tests/test_integration_segmentation_3d.py +++ b/tests/test_integration_segmentation_3d.py @@ -40,7 +40,7 @@ from monai.utils import set_determinism from monai.visualize import plot_2d_or_3d_image from tests.testing_data.integration_answers import test_integration_value -from tests.utils import skip_if_quick +from tests.utils import DistTestCase, TimedCall, skip_if_quick TASK = "integration_segmentation_3d" @@ -191,7 +191,7 @@ def run_inference_test(root_dir, device="cuda:0"): ] ) val_ds = monai.data.Dataset(data=val_files, transform=val_transforms) - # sliding window inferene need to input 1 image in every iteration + # sliding window inference need to input 1 image in every iteration val_loader = monai.data.DataLoader(val_ds, batch_size=1, num_workers=4) val_post_tran = Compose([Activations(sigmoid=True), AsDiscrete(threshold_values=True)]) dice_metric = DiceMetric(include_background=True, reduction="mean") @@ -228,7 +228,7 @@ def run_inference_test(root_dir, device="cuda:0"): @skip_if_quick -class IntegrationSegmentation3D(unittest.TestCase): +class IntegrationSegmentation3D(DistTestCase): def setUp(self): set_determinism(seed=0) @@ -240,47 +240,53 @@ def setUp(self): n = nib.Nifti1Image(seg, np.eye(4)) nib.save(n, os.path.join(self.data_dir, f"seg{i:d}.nii.gz")) - self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") + self.device = "cuda:0" if torch.cuda.is_available() else "cpu:0" def tearDown(self): set_determinism(seed=None) shutil.rmtree(self.data_dir) - def test_training(self): - repeated = [] - for i in range(4): - set_determinism(0) - - repeated.append([]) - losses, best_metric, best_metric_epoch = run_training_test( - self.data_dir, device=self.device, cachedataset=i - ) + def train_and_infer(self, idx=0): + results = [] + set_determinism(0) + losses, best_metric, best_metric_epoch = run_training_test(self.data_dir, device=self.device, cachedataset=idx) + infer_metric = run_inference_test(self.data_dir, device=self.device) - # check training properties - print("losses", losses) - self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-3)) - repeated[i].extend(losses) - print("best metric", best_metric) - self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-2)) - repeated[i].append(best_metric) - self.assertTrue(len(glob(os.path.join(self.data_dir, "runs"))) > 0) - model_file = os.path.join(self.data_dir, "best_metric_model.pth") - self.assertTrue(os.path.exists(model_file)) + # check training properties + print("losses", losses) + print("best metric", best_metric) + print("infer metric", infer_metric) + self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-3)) + self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-2)) + self.assertTrue(len(glob(os.path.join(self.data_dir, "runs"))) > 0) + model_file = os.path.join(self.data_dir, "best_metric_model.pth") + self.assertTrue(os.path.exists(model_file)) - infer_metric = run_inference_test(self.data_dir, device=self.device) + # check inference properties + self.assertTrue(test_integration_value(TASK, key="infer_metric", data=infer_metric, rtol=1e-2)) + output_files = sorted(glob(os.path.join(self.data_dir, "output", "img*", "*.nii.gz"))) + print([np.mean(nib.load(output).get_fdata()) for output in output_files]) + results.extend(losses) + results.append(best_metric) + results.append(infer_metric) + for output in output_files: + ave = np.mean(nib.load(output).get_fdata()) + results.append(ave) + self.assertTrue(test_integration_value(TASK, key="output_sums", data=results[8:], rtol=1e-2)) + return results - # check inference properties - print("infer metric", infer_metric) - self.assertTrue(test_integration_value(TASK, key="infer_metric", data=infer_metric, rtol=1e-2)) - repeated[i].append(infer_metric) - output_files = sorted(glob(os.path.join(self.data_dir, "output", "img*", "*.nii.gz"))) - print([np.mean(nib.load(output).get_fdata()) for output in output_files]) - for output in output_files: - ave = np.mean(nib.load(output).get_fdata()) - repeated[i].append(ave) - self.assertTrue(test_integration_value(TASK, key="output_sums", data=repeated[i][8:], rtol=1e-2)) + def test_training(self): + repeated = [] + for i in range(4): + results = self.train_and_infer(i) + repeated.append(results) np.testing.assert_allclose(repeated[0], repeated[1]) np.testing.assert_allclose(repeated[0], repeated[2]) + np.testing.assert_allclose(repeated[0], repeated[3]) + + @TimedCall(seconds=180, daemon=False) + def test_timing(self): + self.train_and_infer(idx=3) if __name__ == "__main__": diff --git a/tests/test_integration_sliding_window.py b/tests/test_integration_sliding_window.py index 07abdf722e..74c6a82350 100644 --- a/tests/test_integration_sliding_window.py +++ b/tests/test_integration_sliding_window.py @@ -26,7 +26,7 @@ from monai.networks.nets import UNet from monai.transforms import AddChannel from monai.utils import set_determinism -from tests.utils import make_nifti_image, skip_if_quick +from tests.utils import DistTestCase, TimedCall, make_nifti_image, skip_if_quick def run_test(batch_size, img_name, seg_name, output_dir, device="cuda:0"): @@ -60,7 +60,7 @@ def _sliding_window_processor(_engine, batch): @skip_if_quick -class TestIntegrationSlidingWindow(unittest.TestCase): +class TestIntegrationSlidingWindow(DistTestCase): def setUp(self): set_determinism(seed=0) @@ -76,7 +76,9 @@ def tearDown(self): if os.path.exists(self.seg_name): os.remove(self.seg_name) + @TimedCall(seconds=10) def test_training(self): + set_determinism(seed=0) with tempfile.TemporaryDirectory() as tempdir: output_file = run_test( batch_size=2, img_name=self.img_name, seg_name=self.seg_name, output_dir=tempdir, device=self.device diff --git a/tests/test_integration_stn.py b/tests/test_integration_stn.py index 4f9f0fafd3..c8759e5f42 100644 --- a/tests/test_integration_stn.py +++ b/tests/test_integration_stn.py @@ -22,6 +22,7 @@ from monai.data import create_test_image_2d from monai.networks.layers import AffineTransform from monai.utils import set_determinism +from tests.utils import DistTestCase, TimedCall class STNBenchmark(nn.Module): @@ -96,13 +97,14 @@ def compare_2d(is_ref=True, device=None, reverse_indexing=False): return model(img_a).detach().cpu().numpy(), loss.item(), init_loss -class TestSpatialTransformerCore(unittest.TestCase): +class TestSpatialTransformerCore(DistTestCase): def setUp(self): self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") def tearDown(self): set_determinism(seed=None) + @TimedCall(seconds=60) def test_training(self): """ check that the quality AffineTransform backpropagation diff --git a/tests/test_integration_unet_2d.py b/tests/test_integration_unet_2d.py index b69d137d77..435fb3446f 100644 --- a/tests/test_integration_unet_2d.py +++ b/tests/test_integration_unet_2d.py @@ -19,7 +19,7 @@ from monai.data import create_test_image_2d from monai.losses import DiceLoss from monai.networks.nets import BasicUNet, UNet -from tests.utils import skip_if_quick +from tests.utils import DistTestCase, TimedCall, skip_if_quick def run_test(net_name="basicunet", batch_size=64, train_steps=100, device="cuda:0"): @@ -51,7 +51,8 @@ def __len__(self): @skip_if_quick -class TestIntegrationUnet2D(unittest.TestCase): +class TestIntegrationUnet2D(DistTestCase): + @TimedCall(seconds=20, daemon=False) def test_unet_training(self): for n in ["basicunet", "unet"]: loss = run_test(net_name=n, device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0")) diff --git a/tests/test_integration_workflows.py b/tests/test_integration_workflows.py index 969e752089..8d117ddb5a 100644 --- a/tests/test_integration_workflows.py +++ b/tests/test_integration_workflows.py @@ -15,6 +15,7 @@ import sys import tempfile import unittest +import warnings from glob import glob import nibabel as nib @@ -51,12 +52,12 @@ ) from monai.utils import set_determinism from tests.testing_data.integration_answers import test_integration_value -from tests.utils import skip_if_quick +from tests.utils import DistTestCase, TimedCall, skip_if_quick TASK = "integration_workflows" -def run_training_test(root_dir, device="cuda:0", amp=False): +def run_training_test(root_dir, device="cuda:0", amp=False, num_workers=4): images = sorted(glob(os.path.join(root_dir, "img*.nii.gz"))) segs = sorted(glob(os.path.join(root_dir, "seg*.nii.gz"))) train_files = [{"image": img, "label": seg} for img, seg in zip(images[:20], segs[:20])] @@ -87,10 +88,10 @@ def run_training_test(root_dir, device="cuda:0", amp=False): # create a training data loader train_ds = monai.data.CacheDataset(data=train_files, transform=train_transforms, cache_rate=0.5) # use batch_size=2 to load images and use RandCropByPosNegLabeld to generate 2 x 4 images for network training - train_loader = monai.data.DataLoader(train_ds, batch_size=2, shuffle=True, num_workers=4) + train_loader = monai.data.DataLoader(train_ds, batch_size=2, shuffle=True, num_workers=num_workers) # create a validation data loader val_ds = monai.data.CacheDataset(data=val_files, transform=val_transforms, cache_rate=1.0) - val_loader = monai.data.DataLoader(val_ds, batch_size=1, num_workers=4) + val_loader = monai.data.DataLoader(val_ds, batch_size=1, num_workers=num_workers) # create UNet, DiceLoss and Adam optimizer net = monai.networks.nets.UNet( @@ -168,7 +169,7 @@ def run_training_test(root_dir, device="cuda:0", amp=False): return evaluator.state.best_metric -def run_inference_test(root_dir, model_file, device="cuda:0", amp=False): +def run_inference_test(root_dir, model_file, device="cuda:0", amp=False, num_workers=4): images = sorted(glob(os.path.join(root_dir, "im*.nii.gz"))) segs = sorted(glob(os.path.join(root_dir, "seg*.nii.gz"))) val_files = [{"image": img, "label": seg} for img, seg in zip(images, segs)] @@ -185,7 +186,7 @@ def run_inference_test(root_dir, model_file, device="cuda:0", amp=False): # create a validation data loader val_ds = monai.data.Dataset(data=val_files, transform=val_transforms) - val_loader = monai.data.DataLoader(val_ds, batch_size=1, num_workers=4) + val_loader = monai.data.DataLoader(val_ds, batch_size=1, num_workers=num_workers) # create UNet, DiceLoss and Adam optimizer net = monai.networks.nets.UNet( @@ -233,7 +234,7 @@ def run_inference_test(root_dir, model_file, device="cuda:0", amp=False): @skip_if_quick -class IntegrationWorkflows(unittest.TestCase): +class IntegrationWorkflows(DistTestCase): def setUp(self): set_determinism(seed=0) @@ -253,41 +254,58 @@ def tearDown(self): set_determinism(seed=None) shutil.rmtree(self.data_dir) + def train_and_infer(self, idx=0): + results = [] + set_determinism(seed=0) + best_metric = run_training_test(self.data_dir, device=self.device, amp=(idx == 2)) + model_file = sorted(glob(os.path.join(self.data_dir, "net_key_metric*.pt")))[-1] + infer_metric = run_inference_test(self.data_dir, model_file, device=self.device, amp=(idx == 2)) + + print("best metric", best_metric) + print("infer metric", infer_metric) + if idx == 2: + self.assertTrue(test_integration_value(TASK, key="best_metric_2", data=best_metric, rtol=1e-2)) + else: + self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-2)) + # check inference properties + if idx == 2: + self.assertTrue(test_integration_value(TASK, key="infer_metric_2", data=infer_metric, rtol=1e-2)) + else: + self.assertTrue(test_integration_value(TASK, key="infer_metric", data=infer_metric, rtol=1e-2)) + results.append(best_metric) + results.append(infer_metric) + output_files = sorted(glob(os.path.join(self.data_dir, "img*", "*.nii.gz"))) + for output in output_files: + ave = np.mean(nib.load(output).get_fdata()) + results.append(ave) + if idx == 2: + self.assertTrue(test_integration_value(TASK, key="output_sums_2", data=results[2:], rtol=1e-2)) + else: + self.assertTrue(test_integration_value(TASK, key="output_sums", data=results[2:], rtol=1e-2)) + try: + os.remove(model_file) + except Exception as e: + warnings.warn(f"Fail to remove {model_file}: {e}.") + if torch.cuda.is_available(): + try: + torch.cuda.empty_cache() + except Exception: + pass + + return results + def test_training(self): repeated = [] test_rounds = 3 if monai.config.get_torch_version_tuple() >= (1, 6) else 2 for i in range(test_rounds): - set_determinism(seed=0) - - repeated.append([]) - best_metric = run_training_test(self.data_dir, device=self.device, amp=(i == 2)) - print("best metric", best_metric) - if i == 2: - self.assertTrue(test_integration_value(TASK, key="best_metric_2", data=best_metric, rtol=1e-2)) - else: - self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-2)) - repeated[i].append(best_metric) - - model_file = sorted(glob(os.path.join(self.data_dir, "net_key_metric*.pt")))[-1] - infer_metric = run_inference_test(self.data_dir, model_file, device=self.device, amp=(i == 2)) - print("infer metric", infer_metric) - # check inference properties - if i == 2: - self.assertTrue(test_integration_value(TASK, key="infer_metric_2", data=infer_metric, rtol=1e-2)) - else: - self.assertTrue(test_integration_value(TASK, key="infer_metric", data=infer_metric, rtol=1e-2)) - repeated[i].append(infer_metric) - - output_files = sorted(glob(os.path.join(self.data_dir, "img*", "*.nii.gz"))) - for output in output_files: - ave = np.mean(nib.load(output).get_fdata()) - repeated[i].append(ave) - if i == 2: - self.assertTrue(test_integration_value(TASK, key="output_sums_2", data=repeated[i][2:], rtol=1e-2)) - else: - self.assertTrue(test_integration_value(TASK, key="output_sums", data=repeated[i][2:], rtol=1e-2)) + results = self.train_and_infer(idx=i) + repeated.append(results) np.testing.assert_allclose(repeated[0], repeated[1]) + @TimedCall(seconds=300, skip_timing=not torch.cuda.is_available(), daemon=False) + def test_timing(self): + self.train_and_infer(idx=2) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_integration_workflows_gan.py b/tests/test_integration_workflows_gan.py index 922511ab84..56dd8b93e0 100644 --- a/tests/test_integration_workflows_gan.py +++ b/tests/test_integration_workflows_gan.py @@ -30,7 +30,7 @@ from monai.networks.nets import Discriminator, Generator from monai.transforms import AsChannelFirstd, Compose, LoadNiftid, RandFlipd, ScaleIntensityd, ToTensord from monai.utils import set_determinism -from tests.utils import skip_if_quick +from tests.utils import DistTestCase, TimedCall, skip_if_quick def run_training_test(root_dir, device="cuda:0"): @@ -127,7 +127,7 @@ def generator_loss(gen_images): @skip_if_quick -class IntegrationWorkflowsGAN(unittest.TestCase): +class IntegrationWorkflowsGAN(DistTestCase): def setUp(self): set_determinism(seed=0) @@ -145,6 +145,7 @@ def tearDown(self): set_determinism(seed=None) shutil.rmtree(self.data_dir) + @TimedCall(seconds=100, daemon=False) def test_training(self): torch.manual_seed(0) diff --git a/tests/test_iterable_dataset.py b/tests/test_iterable_dataset.py index 53f1267a2a..5a7345905e 100644 --- a/tests/test_iterable_dataset.py +++ b/tests/test_iterable_dataset.py @@ -38,12 +38,14 @@ def __next__(self): data = None # support multi-process access to the database lock.acquire() + count = 0 with open(self.dbpath) as f: count = json.load(f)["count"] if count > 0: data = self.data[count - 1] - with open(self.dbpath, "w") as f: - json.dump({"count": count - 1}, f) + if count > 0: + with open(self.dbpath, "w") as f: + json.dump({"count": count - 1}, f) lock.release() if count == 0: diff --git a/tests/test_timedcall.py b/tests/test_timedcall.py new file mode 100644 index 0000000000..aa9d170d85 --- /dev/null +++ b/tests/test_timedcall.py @@ -0,0 +1,74 @@ +# Copyright 2020 MONAI Consortium +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing +import time +import unittest + +from tests.utils import TimedCall + + +@TimedCall(seconds=10, force_quit=False) +def case_1_seconds(arg=None): + time.sleep(1) + return "good" if not arg else arg + + +@TimedCall(seconds=0.1, skip_timing=True, force_quit=True) +def case_1_seconds_skip(arg=None): + time.sleep(1) + return "good" if not arg else arg + + +@TimedCall(seconds=0.1, force_quit=True) +def case_1_seconds_timeout(arg=None): + time.sleep(1) + return "good" if not arg else arg + + +@TimedCall(seconds=0.1, force_quit=False) +def case_1_seconds_timeout_warning(arg=None): + time.sleep(1) + return "good" if not arg else arg + + +@TimedCall(seconds=0.1, force_quit=True) +def case_1_seconds_bad(arg=None): + time.sleep(1) + assert 0 == 1, "wrong case" + + +class TestTimedCall(unittest.TestCase): + def test_good_call(self): + output = case_1_seconds() + self.assertEqual(output, "good") + + def test_skip_timing(self): + output = case_1_seconds_skip("testing") + self.assertEqual(output, "testing") + + def test_timeout(self): + with self.assertRaises(multiprocessing.TimeoutError): + case_1_seconds_timeout() + + def test_timeout_not_force_quit(self): + with self.assertWarns(Warning): + with self.assertRaises(multiprocessing.TimeoutError): + case_1_seconds_timeout_warning() + + def test_timeout_bad(self): + # timeout before the method's error + with self.assertRaises(multiprocessing.TimeoutError): + case_1_seconds_bad() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/utils.py b/tests/utils.py index c5ba76cdab..3ab73a4fcd 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -13,9 +13,12 @@ import functools import importlib import os +import queue import sys import tempfile +import traceback import unittest +import warnings from io import BytesIO from subprocess import PIPE, Popen from typing import Optional @@ -93,32 +96,23 @@ def make_nifti_image(array, affine=None): class DistTestCase(unittest.TestCase): - """testcase without _outcome, so that it's picklable.""" - - original_mp = None - - def setUp(self) -> None: - self.original_mp = torch.multiprocessing.get_start_method(allow_none=True) - try: - torch.multiprocessing.set_start_method("spawn", force=True) - except RuntimeError: - pass - - def tearDown(self) -> None: - try: - torch.multiprocessing.set_start_method(str(self.original_mp), force=True) - except RuntimeError: - pass + """ + testcase without _outcome, so that it's picklable. + """ def __getstate__(self): self_dict = self.__dict__.copy() del self_dict["_outcome"] return self_dict + def __setstate__(self, data_dict): + self.__dict__.update(data_dict) + class DistCall: """ Wrap a test case so that it will run in multiple processes on a single machine using `torch.distributed`. + It is designed to be used with `tests.utils.DistTestCase`. Usage: @@ -145,6 +139,8 @@ def __init__( timeout=60, init_method=None, backend: Optional[str] = None, + daemon: Optional[bool] = None, + method: Optional[str] = "spawn", verbose: bool = False, ): """ @@ -156,9 +152,14 @@ def __init__( master_port: Master node (rank 0)'s free port. node_rank: The rank of the node, this could be set via environment variable "NODE_RANK". timeout: Timeout for operations executed against the process group. - init_method: URL specifying how to initialize the process group. Default is "env://" if unspecified. + init_method: URL specifying how to initialize the process group. + Default is "env://" or "file:///d:/a_temp" (windows) if unspecified. backend: The backend to use. Depending on build-time configurations, valid values include ``mpi``, ``gloo``, and ``nccl``. + daemon: the process’s daemon flag. + When daemon=None, the initial value is inherited from the creating process. + method: set the method which should be used to start a child process. + method can be 'fork', 'spawn' or 'forkserver'. verbose: whether to print NCCL debug info. """ self.nnodes = int(nnodes) @@ -175,6 +176,9 @@ def __init__( if self.init_method is None and sys.platform == "win32": self.init_method = "file:///d:/a_temp" self.timeout = datetime.timedelta(0, timeout) + self.daemon = daemon + self.method = method + self._original_method = torch.multiprocessing.get_start_method(allow_none=False) self.verbose = verbose def run_process(self, func, local_rank, args, kwargs, results): @@ -219,6 +223,11 @@ def __call__(self, obj): @functools.wraps(obj) def _wrapper(*args, **kwargs): + if self.method: + try: + torch.multiprocessing.set_start_method(self.method, force=True) + except (RuntimeError, ValueError): + pass processes = [] results = torch.multiprocessing.Queue() func = _call_original_func @@ -227,15 +236,130 @@ def _wrapper(*args, **kwargs): p = torch.multiprocessing.Process( target=self.run_process, args=(func, proc_rank, args, kwargs, results) ) + if self.daemon is not None: + p.daemon = self.daemon p.start() processes.append(p) for p in processes: p.join() + if self.method: + try: + torch.multiprocessing.set_start_method(self._original_method, force=True) + except (RuntimeError, ValueError): + pass assert results.get(), "Distributed call failed." return _wrapper +class TimedCall: + """ + Wrap a test case so that it will run in a new process, raises a TimeoutError if the decorated method takes + more than `seconds` to finish. It is designed to be used with `tests.utils.DistTestCase`. + """ + + def __init__( + self, + seconds: float = 60.0, + daemon: Optional[bool] = None, + method: Optional[str] = "spawn", + force_quit: bool = True, + skip_timing=False, + ): + """ + + Args: + seconds: timeout seconds. + daemon: the process’s daemon flag. + When daemon=None, the initial value is inherited from the creating process. + method: set the method which should be used to start a child process. + method can be 'fork', 'spawn' or 'forkserver'. + force_quit: whether to terminate the child process when `seconds` elapsed. + skip_timing: whether to skip the timing constraint. + this is useful to include some system conditions such as + `torch.cuda.is_available()`. + """ + self.timeout_seconds = seconds + self.daemon = daemon + self.force_quit = force_quit + self.skip_timing = skip_timing + self.method = method + self._original_method = torch.multiprocessing.get_start_method(allow_none=False) # remember the default method + + @staticmethod + def run_process(func, args, kwargs, results): + try: + output = func(*args, **kwargs) + results.put(output) + except Exception as e: + e.traceback = traceback.format_exc() + results.put(e) + + def __call__(self, obj): + + if self.skip_timing: + return obj + + _cache_original_func(obj) + + @functools.wraps(obj) + def _wrapper(*args, **kwargs): + + if self.method: + try: + torch.multiprocessing.set_start_method(self.method, force=True) + except (RuntimeError, ValueError): + pass + func = _call_original_func + args = [obj.__name__, obj.__module__] + list(args) + results = torch.multiprocessing.Queue() + p = torch.multiprocessing.Process(target=TimedCall.run_process, args=(func, args, kwargs, results)) + if self.daemon is not None: + p.daemon = self.daemon + p.start() + + p.join(timeout=self.timeout_seconds) + + timeout_error = None + try: + if p.is_alive(): + # create an Exception + timeout_error = torch.multiprocessing.TimeoutError( + f"'{obj.__name__}' in '{obj.__module__}' did not finish in {self.timeout_seconds}s." + ) + if self.force_quit: + p.terminate() + else: + warnings.warn( + f"TimedCall: deadline ({self.timeout_seconds}s) " + f"reached but waiting for {obj.__name__} to finish." + ) + finally: + p.join() + + res = None + try: + res = results.get(block=False) + except queue.Empty: # no result returned, took too long + pass + finally: + if self.method: + try: + torch.multiprocessing.set_start_method(self._original_method, force=True) + except (RuntimeError, ValueError): + pass + if isinstance(res, Exception): # other errors from obj + if hasattr(res, "traceback"): + raise RuntimeError(res.traceback) from res + else: + raise res + if timeout_error: # no force_quit finished + raise timeout_error + return res + + return _wrapper + + _original_funcs = {} @@ -299,7 +423,7 @@ def setUp(self): self.segn = torch.tensor(self.segn) -def test_script_save(net, *inputs, eval_nets=True, device=None): +def test_script_save(net, *inputs, eval_nets=True, device=None, rtol=1e-4): """ Test the ability to save `net` as a Torchscript object, reload it, and apply inference. The value `inputs` is forward-passed through the original and loaded copy of the network and their results returned. Both `net` and its @@ -344,7 +468,7 @@ def test_script_save(net, *inputs, eval_nets=True, device=None): np.testing.assert_allclose( r1.detach().cpu().numpy(), r2.detach().cpu().numpy(), - rtol=1e-5, + rtol=rtol, atol=0, err_msg=f"failed on comparison number: {i}", )