From 4b2579e32802eb506b946ec6912f5d690459b6d2 Mon Sep 17 00:00:00 2001 From: Wenqi Li Date: Fri, 27 Nov 2020 19:29:34 +0000 Subject: [PATCH 1/7] closes #852 adds a timeout call decorator Signed-off-by: Wenqi Li --- tests/test_highresnet.py | 5 +- tests/test_integration_determinism.py | 8 +- tests/test_integration_stn.py | 6 +- tests/test_timedcall.py | 74 +++++++++++++++++ tests/utils.py | 111 ++++++++++++++++++++++++-- 5 files changed, 194 insertions(+), 10 deletions(-) create mode 100644 tests/test_timedcall.py diff --git a/tests/test_highresnet.py b/tests/test_highresnet.py index 1e6c973b2d..6a4b129588 100644 --- a/tests/test_highresnet.py +++ b/tests/test_highresnet.py @@ -15,7 +15,7 @@ from parameterized import parameterized from monai.networks.nets import HighResNet -from tests.utils import test_script_save +from tests.utils import DistTestCase, TimedCall, test_script_save device = "cuda" if torch.cuda.is_available() else "cpu" @@ -44,7 +44,7 @@ ] -class TestHighResNet(unittest.TestCase): +class TestHighResNet(DistTestCase): @parameterized.expand([TEST_CASE_1, TEST_CASE_2, TEST_CASE_3, TEST_CASE_4]) def test_shape(self, input_param, input_shape, expected_shape): net = HighResNet(**input_param).to(device) @@ -53,6 +53,7 @@ def test_shape(self, input_param, input_shape, expected_shape): result = net.forward(torch.randn(input_shape).to(device)) self.assertEqual(result.shape, expected_shape) + @TimedCall(seconds=100, force_quit=True) def test_script(self): input_param, input_shape, expected_shape = TEST_CASE_1 net = HighResNet(**input_param) diff --git a/tests/test_integration_determinism.py b/tests/test_integration_determinism.py index 552fe11394..a52c3b0c45 100644 --- a/tests/test_integration_determinism.py +++ b/tests/test_integration_determinism.py @@ -20,6 +20,7 @@ from monai.networks.nets import UNet from monai.transforms import AddChannel, Compose, RandRotate90, RandSpatialCrop, ScaleIntensity, ToTensor from monai.utils import set_determinism +from tests.utils import DistTestCase, TimedCall def run_test(batch_size=64, train_steps=200, device="cuda:0"): @@ -67,15 +68,18 @@ def __len__(self): return epoch_loss, step -class TestDeterminism(unittest.TestCase): +class TestDeterminism(DistTestCase): def setUp(self): - set_determinism(seed=0) + super().setUp() self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") def tearDown(self): + super().tearDown() set_determinism(seed=None) + @TimedCall(seconds=30) def test_training(self): + set_determinism(seed=0) loss, step = run_test(device=self.device) print(f"Deterministic loss {loss} at training step {step}") np.testing.assert_allclose(step, 4) diff --git a/tests/test_integration_stn.py b/tests/test_integration_stn.py index 4f9f0fafd3..b7b4be7ae4 100644 --- a/tests/test_integration_stn.py +++ b/tests/test_integration_stn.py @@ -22,6 +22,7 @@ from monai.data import create_test_image_2d from monai.networks.layers import AffineTransform from monai.utils import set_determinism +from tests.utils import DistTestCase, TimedCall class STNBenchmark(nn.Module): @@ -96,13 +97,16 @@ def compare_2d(is_ref=True, device=None, reverse_indexing=False): return model(img_a).detach().cpu().numpy(), loss.item(), init_loss -class TestSpatialTransformerCore(unittest.TestCase): +class TestSpatialTransformerCore(DistTestCase): def setUp(self): + super().setUp() self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") def tearDown(self): + super().tearDown() set_determinism(seed=None) + @TimedCall(seconds=60) def test_training(self): """ check that the quality AffineTransform backpropagation diff --git a/tests/test_timedcall.py b/tests/test_timedcall.py new file mode 100644 index 0000000000..aa9d170d85 --- /dev/null +++ b/tests/test_timedcall.py @@ -0,0 +1,74 @@ +# Copyright 2020 MONAI Consortium +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing +import time +import unittest + +from tests.utils import TimedCall + + +@TimedCall(seconds=10, force_quit=False) +def case_1_seconds(arg=None): + time.sleep(1) + return "good" if not arg else arg + + +@TimedCall(seconds=0.1, skip_timing=True, force_quit=True) +def case_1_seconds_skip(arg=None): + time.sleep(1) + return "good" if not arg else arg + + +@TimedCall(seconds=0.1, force_quit=True) +def case_1_seconds_timeout(arg=None): + time.sleep(1) + return "good" if not arg else arg + + +@TimedCall(seconds=0.1, force_quit=False) +def case_1_seconds_timeout_warning(arg=None): + time.sleep(1) + return "good" if not arg else arg + + +@TimedCall(seconds=0.1, force_quit=True) +def case_1_seconds_bad(arg=None): + time.sleep(1) + assert 0 == 1, "wrong case" + + +class TestTimedCall(unittest.TestCase): + def test_good_call(self): + output = case_1_seconds() + self.assertEqual(output, "good") + + def test_skip_timing(self): + output = case_1_seconds_skip("testing") + self.assertEqual(output, "testing") + + def test_timeout(self): + with self.assertRaises(multiprocessing.TimeoutError): + case_1_seconds_timeout() + + def test_timeout_not_force_quit(self): + with self.assertWarns(Warning): + with self.assertRaises(multiprocessing.TimeoutError): + case_1_seconds_timeout_warning() + + def test_timeout_bad(self): + # timeout before the method's error + with self.assertRaises(multiprocessing.TimeoutError): + case_1_seconds_bad() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/utils.py b/tests/utils.py index c5ba76cdab..b189d8fc4c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -13,9 +13,12 @@ import functools import importlib import os +import queue import sys import tempfile +import traceback import unittest +import warnings from io import BytesIO from subprocess import PIPE, Popen from typing import Optional @@ -93,12 +96,15 @@ def make_nifti_image(array, affine=None): class DistTestCase(unittest.TestCase): - """testcase without _outcome, so that it's picklable.""" + """ + testcase without _outcome, so that it's picklable. + set the multiprocessing method to spawn, so that it works across the platforms. + """ original_mp = None def setUp(self) -> None: - self.original_mp = torch.multiprocessing.get_start_method(allow_none=True) + self.original_mp = torch.multiprocessing.get_start_method(allow_none=False) try: torch.multiprocessing.set_start_method("spawn", force=True) except RuntimeError: @@ -106,8 +112,8 @@ def setUp(self) -> None: def tearDown(self) -> None: try: - torch.multiprocessing.set_start_method(str(self.original_mp), force=True) - except RuntimeError: + torch.multiprocessing.set_start_method(self.original_mp, force=True) # type: ignore [arg-type] + except (RuntimeError, ValueError): pass def __getstate__(self): @@ -119,6 +125,7 @@ def __getstate__(self): class DistCall: """ Wrap a test case so that it will run in multiple processes on a single machine using `torch.distributed`. + It is designed to be used with `tests.utils.DistTestCase`. Usage: @@ -156,7 +163,8 @@ def __init__( master_port: Master node (rank 0)'s free port. node_rank: The rank of the node, this could be set via environment variable "NODE_RANK". timeout: Timeout for operations executed against the process group. - init_method: URL specifying how to initialize the process group. Default is "env://" if unspecified. + init_method: URL specifying how to initialize the process group. + Default is "env://" or "file:///d:/a_temp" (windows) if unspecified. backend: The backend to use. Depending on build-time configurations, valid values include ``mpi``, ``gloo``, and ``nccl``. verbose: whether to print NCCL debug info. @@ -236,6 +244,99 @@ def _wrapper(*args, **kwargs): return _wrapper +class TimedCall: + """ + Wrap a test case so that it will run in a new process, raises a TimeoutError if the decorated method takes + more than `seconds` to finish. It is designed to be used with `tests.utils.DistTestCase`. + """ + + def __init__( + self, + seconds: float = 60.0, + daemon: bool = True, + method: Optional[str] = None, + force_quit: bool = True, + skip_timing=False, + ): + """ + + Args: + seconds: timeout seconds. + daemon: the process’s daemon lag. + method: set the method which should be used to start a child process. + method can be 'fork', 'spawn' or 'forkserver'. + force_quit: whether to terminate the child process when `seconds` elapsed. + skip_timing: whether to skip the timing constraint. + this is useful to include some system conditions such as + `torch.cuda.is_available()`. + """ + self.timeout_seconds = seconds + self.daemon = bool(daemon) + self.force_quit = force_quit + self.skip_timing = skip_timing + self.method = method + self._original_method = torch.multiprocessing.get_start_method(allow_none=False) # remember the original method + + @staticmethod + def run_process(func, args, kwargs, results): + try: + output = func(*args, **kwargs) + results.put(output) + except Exception as e: + e.traceback = traceback.format_exc() + results.put(e) + + def __call__(self, obj): + + if self.skip_timing: + return obj + + _cache_original_func(obj) + + @functools.wraps(obj) + def _wrapper(*args, **kwargs): + func = _call_original_func + args = [obj.__name__, obj.__module__] + list(args) + results = torch.multiprocessing.Queue() + p = torch.multiprocessing.Process(target=TimedCall.run_process, args=(func, args, kwargs, results)) + p.daemon = self.daemon + p.start() + + p.join(timeout=self.timeout_seconds) + + timeout_error = None + try: + if p.is_alive(): + # create an Exception + timeout_error = torch.multiprocessing.TimeoutError( + f"'{obj.__name__}' in '{obj.__module__}' did not finish in {self.timeout_seconds}s." + ) + if self.force_quit: + p.terminate() + else: + warnings.warn( + f"TimedCall: deadline ({self.timeout_seconds}s) " + f"reached but waiting for {obj.__name__} to finish." + ) + finally: + p.join() + + try: + res = results.get(block=False) + except queue.Empty: # no result returned, took too long + res = None + if isinstance(res, Exception): # other errors from obj + if hasattr(res, "traceback"): + raise RuntimeError(res.traceback) from res + else: + raise res + if timeout_error: # no force_quit finished + raise timeout_error + return res + + return _wrapper + + _original_funcs = {} From 616d08695d7f9d2da4fd9926ab9785f5473da14c Mon Sep 17 00:00:00 2001 From: Wenqi Li Date: Sun, 29 Nov 2020 17:10:42 +0000 Subject: [PATCH 2/7] adds integration timing Signed-off-by: Wenqi Li --- tests/test_integration_classification_2d.py | 44 +++++++++++--- tests/test_integration_determinism.py | 2 - tests/test_integration_segmentation_3d.py | 28 ++++++++- tests/test_integration_sliding_window.py | 6 +- tests/test_integration_stn.py | 2 - tests/test_integration_workflows.py | 30 +++++++--- tests/utils.py | 66 ++++++++++++++------- 7 files changed, 130 insertions(+), 48 deletions(-) diff --git a/tests/test_integration_classification_2d.py b/tests/test_integration_classification_2d.py index ba08a40b56..f5901f7ddb 100644 --- a/tests/test_integration_classification_2d.py +++ b/tests/test_integration_classification_2d.py @@ -25,7 +25,7 @@ from monai.transforms import AddChannel, Compose, LoadPNG, RandFlip, RandRotate, RandZoom, ScaleIntensity, ToTensor from monai.utils import set_determinism from tests.testing_data.integration_answers import test_integration_value -from tests.utils import skip_if_quick +from tests.utils import DistTestCase, TimedCall, skip_if_quick TEST_DATA_URL = "https://www.dropbox.com/s/5wwskxctvcxiuea/MedNIST.tar.gz?dl=1" MD5_VALUE = "0bc7306e7427e00ad1c5526a6677552d" @@ -45,7 +45,7 @@ def __getitem__(self, index): return self.transforms(self.image_files[index]), self.labels[index] -def run_training_test(root_dir, train_x, train_y, val_x, val_y, device="cuda:0"): +def run_training_test(root_dir, train_x, train_y, val_x, val_y, device="cuda:0", num_workers=10): monai.config.print_config() # define transforms for image and classification @@ -65,10 +65,10 @@ def run_training_test(root_dir, train_x, train_y, val_x, val_y, device="cuda:0") # create train, val data loaders train_ds = MedNISTDataset(train_x, train_y, train_transforms) - train_loader = DataLoader(train_ds, batch_size=300, shuffle=True, num_workers=10) + train_loader = DataLoader(train_ds, batch_size=300, shuffle=True, num_workers=num_workers) val_ds = MedNISTDataset(val_x, val_y, val_transforms) - val_loader = DataLoader(val_ds, batch_size=300, num_workers=10) + val_loader = DataLoader(val_ds, batch_size=300, num_workers=num_workers) model = densenet121(spatial_dims=2, in_channels=1, out_channels=len(np.unique(train_y))).to(device) loss_function = torch.nn.CrossEntropyLoss() @@ -127,11 +127,11 @@ def run_training_test(root_dir, train_x, train_y, val_x, val_y, device="cuda:0") return epoch_loss_values, best_metric, best_metric_epoch -def run_inference_test(root_dir, test_x, test_y, device="cuda:0"): +def run_inference_test(root_dir, test_x, test_y, device="cuda:0", num_workers=10): # define transforms for image and classification val_transforms = Compose([LoadPNG(image_only=True), AddChannel(), ScaleIntensity(), ToTensor()]) val_ds = MedNISTDataset(test_x, test_y, val_transforms) - val_loader = DataLoader(val_ds, batch_size=300, num_workers=10) + val_loader = DataLoader(val_ds, batch_size=300, num_workers=num_workers) model = densenet121(spatial_dims=2, in_channels=1, out_channels=len(np.unique(test_y))).to(device) @@ -152,7 +152,7 @@ def run_inference_test(root_dir, test_x, test_y, device="cuda:0"): @skip_if_quick -class IntegrationClassification2D(unittest.TestCase): +class IntegrationClassification2D(DistTestCase): def setUp(self): set_determinism(seed=0) self.data_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "testing_data") @@ -198,7 +198,7 @@ def setUp(self): self.train_x.append(image_file_list[i]) self.train_y.append(image_classes[i]) - self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") + self.device = "cuda:0" if torch.cuda.is_available() else "cpu:0" def tearDown(self): set_determinism(seed=None) @@ -240,6 +240,34 @@ def test_training(self): np.testing.assert_allclose(repeated[0], repeated[1]) + @TimedCall(seconds=200, skip_timing=not torch.cuda.is_available()) + def test_timing(self): + if not os.path.exists(os.path.join(self.data_dir, "MedNIST")): + # skip test if no MedNIST dataset + return + set_determinism(seed=0) + # run training + losses, best_metric, best_metric_epoch = run_training_test( + self.data_dir, + self.train_x, + self.train_y, + self.val_x, + self.val_y, + device=self.device, + num_workers=0, + ) + # check training properties + print(f"integration_classification_2d {losses}") + self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-2)) + print("best metric", best_metric) + self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-4)) + np.testing.assert_allclose(best_metric_epoch, 4) + + infer_metric = run_inference_test(self.data_dir, self.test_x, self.test_y, device=self.device, num_workers=0) + print("infer metric", infer_metric) + # check inference properties + self.assertTrue(test_integration_value(TASK, key="infer_prop", data=np.asarray(infer_metric), rtol=1)) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_integration_determinism.py b/tests/test_integration_determinism.py index a52c3b0c45..dbabc96da1 100644 --- a/tests/test_integration_determinism.py +++ b/tests/test_integration_determinism.py @@ -70,11 +70,9 @@ def __len__(self): class TestDeterminism(DistTestCase): def setUp(self): - super().setUp() self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") def tearDown(self): - super().tearDown() set_determinism(seed=None) @TimedCall(seconds=30) diff --git a/tests/test_integration_segmentation_3d.py b/tests/test_integration_segmentation_3d.py index 2a7cc0de6d..3cf9503d82 100644 --- a/tests/test_integration_segmentation_3d.py +++ b/tests/test_integration_segmentation_3d.py @@ -40,7 +40,7 @@ from monai.utils import set_determinism from monai.visualize import plot_2d_or_3d_image from tests.testing_data.integration_answers import test_integration_value -from tests.utils import skip_if_quick +from tests.utils import DistTestCase, TimedCall, skip_if_quick TASK = "integration_segmentation_3d" @@ -228,7 +228,7 @@ def run_inference_test(root_dir, device="cuda:0"): @skip_if_quick -class IntegrationSegmentation3D(unittest.TestCase): +class IntegrationSegmentation3D(DistTestCase): def setUp(self): set_determinism(seed=0) @@ -240,7 +240,7 @@ def setUp(self): n = nib.Nifti1Image(seg, np.eye(4)) nib.save(n, os.path.join(self.data_dir, f"seg{i:d}.nii.gz")) - self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") + self.device = "cuda:0" if torch.cuda.is_available() else "cpu:0" def tearDown(self): set_determinism(seed=None) @@ -282,6 +282,28 @@ def test_training(self): np.testing.assert_allclose(repeated[0], repeated[1]) np.testing.assert_allclose(repeated[0], repeated[2]) + @TimedCall(seconds=180, daemon=False) + def test_timing(self): + set_determinism(0) + + # run training + losses, best_metric, best_metric_epoch = run_training_test( + self.data_dir, + device=self.device, + cachedataset=3, + ) + # check training properties + print("losses", losses) + self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-3)) + print("best metric", best_metric) + self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-2)) + + # run inference + infer_metric = run_inference_test(self.data_dir, device=self.device) + # check inference properties + print("infer metric", infer_metric) + self.assertTrue(test_integration_value(TASK, key="infer_metric", data=infer_metric, rtol=1e-2)) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_integration_sliding_window.py b/tests/test_integration_sliding_window.py index 07abdf722e..74c6a82350 100644 --- a/tests/test_integration_sliding_window.py +++ b/tests/test_integration_sliding_window.py @@ -26,7 +26,7 @@ from monai.networks.nets import UNet from monai.transforms import AddChannel from monai.utils import set_determinism -from tests.utils import make_nifti_image, skip_if_quick +from tests.utils import DistTestCase, TimedCall, make_nifti_image, skip_if_quick def run_test(batch_size, img_name, seg_name, output_dir, device="cuda:0"): @@ -60,7 +60,7 @@ def _sliding_window_processor(_engine, batch): @skip_if_quick -class TestIntegrationSlidingWindow(unittest.TestCase): +class TestIntegrationSlidingWindow(DistTestCase): def setUp(self): set_determinism(seed=0) @@ -76,7 +76,9 @@ def tearDown(self): if os.path.exists(self.seg_name): os.remove(self.seg_name) + @TimedCall(seconds=10) def test_training(self): + set_determinism(seed=0) with tempfile.TemporaryDirectory() as tempdir: output_file = run_test( batch_size=2, img_name=self.img_name, seg_name=self.seg_name, output_dir=tempdir, device=self.device diff --git a/tests/test_integration_stn.py b/tests/test_integration_stn.py index b7b4be7ae4..c8759e5f42 100644 --- a/tests/test_integration_stn.py +++ b/tests/test_integration_stn.py @@ -99,11 +99,9 @@ def compare_2d(is_ref=True, device=None, reverse_indexing=False): class TestSpatialTransformerCore(DistTestCase): def setUp(self): - super().setUp() self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") def tearDown(self): - super().tearDown() set_determinism(seed=None) @TimedCall(seconds=60) diff --git a/tests/test_integration_workflows.py b/tests/test_integration_workflows.py index 969e752089..ad9f827d80 100644 --- a/tests/test_integration_workflows.py +++ b/tests/test_integration_workflows.py @@ -51,12 +51,12 @@ ) from monai.utils import set_determinism from tests.testing_data.integration_answers import test_integration_value -from tests.utils import skip_if_quick +from tests.utils import DistTestCase, TimedCall, skip_if_quick TASK = "integration_workflows" -def run_training_test(root_dir, device="cuda:0", amp=False): +def run_training_test(root_dir, device="cuda:0", amp=False, num_workers=4): images = sorted(glob(os.path.join(root_dir, "img*.nii.gz"))) segs = sorted(glob(os.path.join(root_dir, "seg*.nii.gz"))) train_files = [{"image": img, "label": seg} for img, seg in zip(images[:20], segs[:20])] @@ -87,10 +87,10 @@ def run_training_test(root_dir, device="cuda:0", amp=False): # create a training data loader train_ds = monai.data.CacheDataset(data=train_files, transform=train_transforms, cache_rate=0.5) # use batch_size=2 to load images and use RandCropByPosNegLabeld to generate 2 x 4 images for network training - train_loader = monai.data.DataLoader(train_ds, batch_size=2, shuffle=True, num_workers=4) + train_loader = monai.data.DataLoader(train_ds, batch_size=2, shuffle=True, num_workers=num_workers) # create a validation data loader val_ds = monai.data.CacheDataset(data=val_files, transform=val_transforms, cache_rate=1.0) - val_loader = monai.data.DataLoader(val_ds, batch_size=1, num_workers=4) + val_loader = monai.data.DataLoader(val_ds, batch_size=1, num_workers=num_workers) # create UNet, DiceLoss and Adam optimizer net = monai.networks.nets.UNet( @@ -168,7 +168,7 @@ def run_training_test(root_dir, device="cuda:0", amp=False): return evaluator.state.best_metric -def run_inference_test(root_dir, model_file, device="cuda:0", amp=False): +def run_inference_test(root_dir, model_file, device="cuda:0", amp=False, num_workers=4): images = sorted(glob(os.path.join(root_dir, "im*.nii.gz"))) segs = sorted(glob(os.path.join(root_dir, "seg*.nii.gz"))) val_files = [{"image": img, "label": seg} for img, seg in zip(images, segs)] @@ -185,7 +185,7 @@ def run_inference_test(root_dir, model_file, device="cuda:0", amp=False): # create a validation data loader val_ds = monai.data.Dataset(data=val_files, transform=val_transforms) - val_loader = monai.data.DataLoader(val_ds, batch_size=1, num_workers=4) + val_loader = monai.data.DataLoader(val_ds, batch_size=1, num_workers=num_workers) # create UNet, DiceLoss and Adam optimizer net = monai.networks.nets.UNet( @@ -233,7 +233,7 @@ def run_inference_test(root_dir, model_file, device="cuda:0", amp=False): @skip_if_quick -class IntegrationWorkflows(unittest.TestCase): +class IntegrationWorkflows(DistTestCase): def setUp(self): set_determinism(seed=0) @@ -245,7 +245,7 @@ def setUp(self): n = nib.Nifti1Image(seg, np.eye(4)) nib.save(n, os.path.join(self.data_dir, f"seg{i:d}.nii.gz")) - self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") + self.device = "cuda:0" if torch.cuda.is_available() else "cpu:0" monai.config.print_config() logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -288,6 +288,20 @@ def test_training(self): self.assertTrue(test_integration_value(TASK, key="output_sums", data=repeated[i][2:], rtol=1e-2)) np.testing.assert_allclose(repeated[0], repeated[1]) + @TimedCall(seconds=100, skip_timing=not torch.cuda.is_available()) + def test_timing(self): + set_determinism(seed=0) + + best_metric = run_training_test(self.data_dir, device=self.device, amp=True, num_workers=0) + print("best metric", best_metric) + self.assertTrue(test_integration_value(TASK, key="best_metric_2", data=best_metric, rtol=1e-2)) + + model_file = sorted(glob(os.path.join(self.data_dir, "net_key_metric*.pt")))[-1] + infer_metric = run_inference_test(self.data_dir, model_file, device=self.device, amp=(i == 2), , num_workers=0) + print("infer metric", infer_metric) + # check inference properties + self.assertTrue(test_integration_value(TASK, key="infer_metric_2", data=infer_metric, rtol=1e-2)) + if __name__ == "__main__": unittest.main() diff --git a/tests/utils.py b/tests/utils.py index b189d8fc4c..283ecaed72 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -98,24 +98,8 @@ def make_nifti_image(array, affine=None): class DistTestCase(unittest.TestCase): """ testcase without _outcome, so that it's picklable. - set the multiprocessing method to spawn, so that it works across the platforms. """ - original_mp = None - - def setUp(self) -> None: - self.original_mp = torch.multiprocessing.get_start_method(allow_none=False) - try: - torch.multiprocessing.set_start_method("spawn", force=True) - except RuntimeError: - pass - - def tearDown(self) -> None: - try: - torch.multiprocessing.set_start_method(self.original_mp, force=True) # type: ignore [arg-type] - except (RuntimeError, ValueError): - pass - def __getstate__(self): self_dict = self.__dict__.copy() del self_dict["_outcome"] @@ -152,6 +136,8 @@ def __init__( timeout=60, init_method=None, backend: Optional[str] = None, + daemon: Optional[bool] = None, + method: Optional[str] = "spawn", verbose: bool = False, ): """ @@ -167,6 +153,10 @@ def __init__( Default is "env://" or "file:///d:/a_temp" (windows) if unspecified. backend: The backend to use. Depending on build-time configurations, valid values include ``mpi``, ``gloo``, and ``nccl``. + daemon: the process’s daemon flag. + When daemon=None, the initial value is inherited from the creating process. + method: set the method which should be used to start a child process. + method can be 'fork', 'spawn' or 'forkserver'. verbose: whether to print NCCL debug info. """ self.nnodes = int(nnodes) @@ -183,6 +173,9 @@ def __init__( if self.init_method is None and sys.platform == "win32": self.init_method = "file:///d:/a_temp" self.timeout = datetime.timedelta(0, timeout) + self.daemon = daemon + self.method = method + self._original_method = torch.multiprocessing.get_start_method(allow_none=False) self.verbose = verbose def run_process(self, func, local_rank, args, kwargs, results): @@ -227,6 +220,11 @@ def __call__(self, obj): @functools.wraps(obj) def _wrapper(*args, **kwargs): + if self.method: + try: + torch.multiprocessing.set_start_method(self.method, force=True) + except (RuntimeError, ValueError): + pass processes = [] results = torch.multiprocessing.Queue() func = _call_original_func @@ -235,10 +233,17 @@ def _wrapper(*args, **kwargs): p = torch.multiprocessing.Process( target=self.run_process, args=(func, proc_rank, args, kwargs, results) ) + if self.daemon is not None: + p.daemon = self.daemon p.start() processes.append(p) for p in processes: p.join() + if self.method: + try: + torch.multiprocessing.set_start_method(self._original_method, force=True) + except (RuntimeError, ValueError): + pass assert results.get(), "Distributed call failed." return _wrapper @@ -253,8 +258,8 @@ class TimedCall: def __init__( self, seconds: float = 60.0, - daemon: bool = True, - method: Optional[str] = None, + daemon: Optional[bool] = None, + method: Optional[str] = "spawn", force_quit: bool = True, skip_timing=False, ): @@ -262,7 +267,8 @@ def __init__( Args: seconds: timeout seconds. - daemon: the process’s daemon lag. + daemon: the process’s daemon flag. + When daemon=None, the initial value is inherited from the creating process. method: set the method which should be used to start a child process. method can be 'fork', 'spawn' or 'forkserver'. force_quit: whether to terminate the child process when `seconds` elapsed. @@ -271,11 +277,11 @@ def __init__( `torch.cuda.is_available()`. """ self.timeout_seconds = seconds - self.daemon = bool(daemon) + self.daemon = daemon self.force_quit = force_quit self.skip_timing = skip_timing self.method = method - self._original_method = torch.multiprocessing.get_start_method(allow_none=False) # remember the original method + self._original_method = torch.multiprocessing.get_start_method(allow_none=False) # remember the default method @staticmethod def run_process(func, args, kwargs, results): @@ -295,11 +301,18 @@ def __call__(self, obj): @functools.wraps(obj) def _wrapper(*args, **kwargs): + + if self.method: + try: + torch.multiprocessing.set_start_method(self.method, force=True) + except (RuntimeError, ValueError): + pass func = _call_original_func args = [obj.__name__, obj.__module__] + list(args) results = torch.multiprocessing.Queue() p = torch.multiprocessing.Process(target=TimedCall.run_process, args=(func, args, kwargs, results)) - p.daemon = self.daemon + if self.daemon is not None: + p.daemon = self.daemon p.start() p.join(timeout=self.timeout_seconds) @@ -321,10 +334,17 @@ def _wrapper(*args, **kwargs): finally: p.join() + res = None try: res = results.get(block=False) except queue.Empty: # no result returned, took too long - res = None + pass + finally: + if self.method: + try: + torch.multiprocessing.set_start_method(self._original_method, force=True) + except (RuntimeError, ValueError): + pass if isinstance(res, Exception): # other errors from obj if hasattr(res, "traceback"): raise RuntimeError(res.traceback) from res From 442dab092d10e11e016fe43d18796ef6321f5304 Mon Sep 17 00:00:00 2001 From: Wenqi Li Date: Sun, 29 Nov 2020 18:49:08 +0000 Subject: [PATCH 3/7] fixes training time Signed-off-by: Wenqi Li --- tests/test_integration_classification_2d.py | 2 +- tests/test_integration_workflows.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_integration_classification_2d.py b/tests/test_integration_classification_2d.py index f5901f7ddb..52a724dccb 100644 --- a/tests/test_integration_classification_2d.py +++ b/tests/test_integration_classification_2d.py @@ -240,7 +240,7 @@ def test_training(self): np.testing.assert_allclose(repeated[0], repeated[1]) - @TimedCall(seconds=200, skip_timing=not torch.cuda.is_available()) + @TimedCall(seconds=300, skip_timing=not torch.cuda.is_available()) def test_timing(self): if not os.path.exists(os.path.join(self.data_dir, "MedNIST")): # skip test if no MedNIST dataset diff --git a/tests/test_integration_workflows.py b/tests/test_integration_workflows.py index ad9f827d80..2b24853d01 100644 --- a/tests/test_integration_workflows.py +++ b/tests/test_integration_workflows.py @@ -288,7 +288,7 @@ def test_training(self): self.assertTrue(test_integration_value(TASK, key="output_sums", data=repeated[i][2:], rtol=1e-2)) np.testing.assert_allclose(repeated[0], repeated[1]) - @TimedCall(seconds=100, skip_timing=not torch.cuda.is_available()) + @TimedCall(seconds=200, skip_timing=not torch.cuda.is_available()) def test_timing(self): set_determinism(seed=0) @@ -297,7 +297,7 @@ def test_timing(self): self.assertTrue(test_integration_value(TASK, key="best_metric_2", data=best_metric, rtol=1e-2)) model_file = sorted(glob(os.path.join(self.data_dir, "net_key_metric*.pt")))[-1] - infer_metric = run_inference_test(self.data_dir, model_file, device=self.device, amp=(i == 2), , num_workers=0) + infer_metric = run_inference_test(self.data_dir, model_file, device=self.device, amp=(i == 2), num_workers=0) print("infer metric", infer_metric) # check inference properties self.assertTrue(test_integration_value(TASK, key="infer_metric_2", data=infer_metric, rtol=1e-2)) From 8443de5d636def045768fa7812c418650992e5fb Mon Sep 17 00:00:00 2001 From: Wenqi Li Date: Sun, 29 Nov 2020 19:28:47 +0000 Subject: [PATCH 4/7] fixes integration tests Signed-off-by: Wenqi Li --- tests/test_integration_classification_2d.py | 5 ++--- tests/test_integration_workflows.py | 8 ++++---- tests/utils.py | 3 +++ 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/test_integration_classification_2d.py b/tests/test_integration_classification_2d.py index 52a724dccb..75536983b2 100644 --- a/tests/test_integration_classification_2d.py +++ b/tests/test_integration_classification_2d.py @@ -240,7 +240,7 @@ def test_training(self): np.testing.assert_allclose(repeated[0], repeated[1]) - @TimedCall(seconds=300, skip_timing=not torch.cuda.is_available()) + @TimedCall(seconds=400, skip_timing=not torch.cuda.is_available(), daemon=False) def test_timing(self): if not os.path.exists(os.path.join(self.data_dir, "MedNIST")): # skip test if no MedNIST dataset @@ -254,7 +254,6 @@ def test_timing(self): self.val_x, self.val_y, device=self.device, - num_workers=0, ) # check training properties print(f"integration_classification_2d {losses}") @@ -263,7 +262,7 @@ def test_timing(self): self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-4)) np.testing.assert_allclose(best_metric_epoch, 4) - infer_metric = run_inference_test(self.data_dir, self.test_x, self.test_y, device=self.device, num_workers=0) + infer_metric = run_inference_test(self.data_dir, self.test_x, self.test_y, device=self.device) print("infer metric", infer_metric) # check inference properties self.assertTrue(test_integration_value(TASK, key="infer_prop", data=np.asarray(infer_metric), rtol=1)) diff --git a/tests/test_integration_workflows.py b/tests/test_integration_workflows.py index 2b24853d01..165a7d6d0d 100644 --- a/tests/test_integration_workflows.py +++ b/tests/test_integration_workflows.py @@ -245,7 +245,7 @@ def setUp(self): n = nib.Nifti1Image(seg, np.eye(4)) nib.save(n, os.path.join(self.data_dir, f"seg{i:d}.nii.gz")) - self.device = "cuda:0" if torch.cuda.is_available() else "cpu:0" + self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0") monai.config.print_config() logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -288,16 +288,16 @@ def test_training(self): self.assertTrue(test_integration_value(TASK, key="output_sums", data=repeated[i][2:], rtol=1e-2)) np.testing.assert_allclose(repeated[0], repeated[1]) - @TimedCall(seconds=200, skip_timing=not torch.cuda.is_available()) + @TimedCall(seconds=200, skip_timing=not torch.cuda.is_available(), daemon=False) def test_timing(self): set_determinism(seed=0) - best_metric = run_training_test(self.data_dir, device=self.device, amp=True, num_workers=0) + best_metric = run_training_test(self.data_dir, device=self.device, amp=True, num_workers=4) print("best metric", best_metric) self.assertTrue(test_integration_value(TASK, key="best_metric_2", data=best_metric, rtol=1e-2)) model_file = sorted(glob(os.path.join(self.data_dir, "net_key_metric*.pt")))[-1] - infer_metric = run_inference_test(self.data_dir, model_file, device=self.device, amp=(i == 2), num_workers=0) + infer_metric = run_inference_test(self.data_dir, model_file, device=self.device, amp=True, num_workers=4) print("infer metric", infer_metric) # check inference properties self.assertTrue(test_integration_value(TASK, key="infer_metric_2", data=infer_metric, rtol=1e-2)) diff --git a/tests/utils.py b/tests/utils.py index 283ecaed72..ffbd8774b7 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -105,6 +105,9 @@ def __getstate__(self): del self_dict["_outcome"] return self_dict + def __setstate__(self, data_dict): + self.__dict__.update(data_dict) + class DistCall: """ From c41f721b0aa15cf4bf90bf2a95bc33adb7d3a904 Mon Sep 17 00:00:00 2001 From: Wenqi Li Date: Mon, 30 Nov 2020 05:11:49 -0500 Subject: [PATCH 5/7] updates integration tests Signed-off-by: Wenqi Li --- tests/test_integration_unet_2d.py | 5 +++-- tests/test_integration_workflows_gan.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/test_integration_unet_2d.py b/tests/test_integration_unet_2d.py index b69d137d77..435fb3446f 100644 --- a/tests/test_integration_unet_2d.py +++ b/tests/test_integration_unet_2d.py @@ -19,7 +19,7 @@ from monai.data import create_test_image_2d from monai.losses import DiceLoss from monai.networks.nets import BasicUNet, UNet -from tests.utils import skip_if_quick +from tests.utils import DistTestCase, TimedCall, skip_if_quick def run_test(net_name="basicunet", batch_size=64, train_steps=100, device="cuda:0"): @@ -51,7 +51,8 @@ def __len__(self): @skip_if_quick -class TestIntegrationUnet2D(unittest.TestCase): +class TestIntegrationUnet2D(DistTestCase): + @TimedCall(seconds=20, daemon=False) def test_unet_training(self): for n in ["basicunet", "unet"]: loss = run_test(net_name=n, device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu:0")) diff --git a/tests/test_integration_workflows_gan.py b/tests/test_integration_workflows_gan.py index 922511ab84..56dd8b93e0 100644 --- a/tests/test_integration_workflows_gan.py +++ b/tests/test_integration_workflows_gan.py @@ -30,7 +30,7 @@ from monai.networks.nets import Discriminator, Generator from monai.transforms import AsChannelFirstd, Compose, LoadNiftid, RandFlipd, ScaleIntensityd, ToTensord from monai.utils import set_determinism -from tests.utils import skip_if_quick +from tests.utils import DistTestCase, TimedCall, skip_if_quick def run_training_test(root_dir, device="cuda:0"): @@ -127,7 +127,7 @@ def generator_loss(gen_images): @skip_if_quick -class IntegrationWorkflowsGAN(unittest.TestCase): +class IntegrationWorkflowsGAN(DistTestCase): def setUp(self): set_determinism(seed=0) @@ -145,6 +145,7 @@ def tearDown(self): set_determinism(seed=None) shutil.rmtree(self.data_dir) + @TimedCall(seconds=100, daemon=False) def test_training(self): torch.manual_seed(0) From 82c14d5d94135ced2a6237b40faa50945b425035 Mon Sep 17 00:00:00 2001 From: Wenqi Li Date: Wed, 2 Dec 2020 10:57:52 +0000 Subject: [PATCH 6/7] update based on comments Signed-off-by: Wenqi Li --- tests/test_integration_classification_2d.py | 72 +++++++----------- tests/test_integration_segmentation_3d.py | 82 +++++++++------------ tests/test_integration_workflows.py | 73 +++++++++--------- 3 files changed, 92 insertions(+), 135 deletions(-) diff --git a/tests/test_integration_classification_2d.py b/tests/test_integration_classification_2d.py index 75536983b2..8713965a8b 100644 --- a/tests/test_integration_classification_2d.py +++ b/tests/test_integration_classification_2d.py @@ -208,64 +208,44 @@ def tearDown(self): warnings.warn("not found best_metric_model.pth, training skipped?") pass - def test_training(self): + def train_and_infer(self, idx=0): + results = [] if not os.path.exists(os.path.join(self.data_dir, "MedNIST")): # skip test if no MedNIST dataset - return - repeated = [] - for i in range(2): - set_determinism(seed=0) - - repeated.append([]) - losses, best_metric, best_metric_epoch = run_training_test( - self.data_dir, self.train_x, self.train_y, self.val_x, self.val_y, device=self.device - ) - - # check training properties - print(f"integration_classification_2d {losses}") - self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-2)) - repeated[i].extend(losses) - print("best metric", best_metric) - self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-4)) - repeated[i].append(best_metric) - np.testing.assert_allclose(best_metric_epoch, 4) - model_file = os.path.join(self.data_dir, "best_metric_model.pth") - self.assertTrue(os.path.exists(model_file)) - - infer_metric = run_inference_test(self.data_dir, self.test_x, self.test_y, device=self.device) - print("infer metric", infer_metric) - # check inference properties - self.assertTrue(test_integration_value(TASK, key="infer_prop", data=np.asarray(infer_metric), rtol=1)) - repeated[i].extend(infer_metric) + return results - np.testing.assert_allclose(repeated[0], repeated[1]) - - @TimedCall(seconds=400, skip_timing=not torch.cuda.is_available(), daemon=False) - def test_timing(self): - if not os.path.exists(os.path.join(self.data_dir, "MedNIST")): - # skip test if no MedNIST dataset - return set_determinism(seed=0) - # run training losses, best_metric, best_metric_epoch = run_training_test( - self.data_dir, - self.train_x, - self.train_y, - self.val_x, - self.val_y, - device=self.device, + self.data_dir, self.train_x, self.train_y, self.val_x, self.val_y, device=self.device ) - # check training properties + infer_metric = run_inference_test(self.data_dir, self.test_x, self.test_y, device=self.device) + print(f"integration_classification_2d {losses}") - self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-2)) print("best metric", best_metric) + print("infer metric", infer_metric) + # check training properties + self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-2)) self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-4)) np.testing.assert_allclose(best_metric_epoch, 4) - - infer_metric = run_inference_test(self.data_dir, self.test_x, self.test_y, device=self.device) - print("infer metric", infer_metric) + model_file = os.path.join(self.data_dir, "best_metric_model.pth") + self.assertTrue(os.path.exists(model_file)) # check inference properties self.assertTrue(test_integration_value(TASK, key="infer_prop", data=np.asarray(infer_metric), rtol=1)) + results.extend(losses) + results.append(best_metric) + results.extend(infer_metric) + return results + + def test_training(self): + repeated = [] + for i in range(2): + results = self.train_and_infer(i) + repeated.append(results) + np.testing.assert_allclose(repeated[0], repeated[1]) + + @TimedCall(seconds=400, skip_timing=not torch.cuda.is_available(), daemon=False) + def test_timing(self): + self.train_and_infer() if __name__ == "__main__": diff --git a/tests/test_integration_segmentation_3d.py b/tests/test_integration_segmentation_3d.py index 3cf9503d82..cfeefc3f46 100644 --- a/tests/test_integration_segmentation_3d.py +++ b/tests/test_integration_segmentation_3d.py @@ -191,7 +191,7 @@ def run_inference_test(root_dir, device="cuda:0"): ] ) val_ds = monai.data.Dataset(data=val_files, transform=val_transforms) - # sliding window inferene need to input 1 image in every iteration + # sliding window inference need to input 1 image in every iteration val_loader = monai.data.DataLoader(val_ds, batch_size=1, num_workers=4) val_post_tran = Compose([Activations(sigmoid=True), AsDiscrete(threshold_values=True)]) dice_metric = DiceMetric(include_background=True, reduction="mean") @@ -246,63 +246,47 @@ def tearDown(self): set_determinism(seed=None) shutil.rmtree(self.data_dir) - def test_training(self): - repeated = [] - for i in range(4): - set_determinism(0) - - repeated.append([]) - losses, best_metric, best_metric_epoch = run_training_test( - self.data_dir, device=self.device, cachedataset=i - ) - - # check training properties - print("losses", losses) - self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-3)) - repeated[i].extend(losses) - print("best metric", best_metric) - self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-2)) - repeated[i].append(best_metric) - self.assertTrue(len(glob(os.path.join(self.data_dir, "runs"))) > 0) - model_file = os.path.join(self.data_dir, "best_metric_model.pth") - self.assertTrue(os.path.exists(model_file)) - - infer_metric = run_inference_test(self.data_dir, device=self.device) - - # check inference properties - print("infer metric", infer_metric) - self.assertTrue(test_integration_value(TASK, key="infer_metric", data=infer_metric, rtol=1e-2)) - repeated[i].append(infer_metric) - output_files = sorted(glob(os.path.join(self.data_dir, "output", "img*", "*.nii.gz"))) - print([np.mean(nib.load(output).get_fdata()) for output in output_files]) - for output in output_files: - ave = np.mean(nib.load(output).get_fdata()) - repeated[i].append(ave) - self.assertTrue(test_integration_value(TASK, key="output_sums", data=repeated[i][8:], rtol=1e-2)) - np.testing.assert_allclose(repeated[0], repeated[1]) - np.testing.assert_allclose(repeated[0], repeated[2]) - - @TimedCall(seconds=180, daemon=False) - def test_timing(self): + def train_and_infer(self, idx=0): + results = [] set_determinism(0) + losses, best_metric, best_metric_epoch = run_training_test(self.data_dir, device=self.device, cachedataset=idx) + infer_metric = run_inference_test(self.data_dir, device=self.device) - # run training - losses, best_metric, best_metric_epoch = run_training_test( - self.data_dir, - device=self.device, - cachedataset=3, - ) # check training properties print("losses", losses) - self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-3)) print("best metric", best_metric) + print("infer metric", infer_metric) + self.assertTrue(test_integration_value(TASK, key="losses", data=losses, rtol=1e-3)) self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-2)) + self.assertTrue(len(glob(os.path.join(self.data_dir, "runs"))) > 0) + model_file = os.path.join(self.data_dir, "best_metric_model.pth") + self.assertTrue(os.path.exists(model_file)) - # run inference - infer_metric = run_inference_test(self.data_dir, device=self.device) # check inference properties - print("infer metric", infer_metric) self.assertTrue(test_integration_value(TASK, key="infer_metric", data=infer_metric, rtol=1e-2)) + output_files = sorted(glob(os.path.join(self.data_dir, "output", "img*", "*.nii.gz"))) + print([np.mean(nib.load(output).get_fdata()) for output in output_files]) + results.extend(losses) + results.append(best_metric) + results.append(infer_metric) + for output in output_files: + ave = np.mean(nib.load(output).get_fdata()) + results.append(ave) + self.assertTrue(test_integration_value(TASK, key="output_sums", data=results[8:], rtol=1e-2)) + return results + + def test_training(self): + repeated = [] + for i in range(4): + results = self.train_and_infer(i) + repeated.append(results) + np.testing.assert_allclose(repeated[0], repeated[1]) + np.testing.assert_allclose(repeated[0], repeated[2]) + np.testing.assert_allclose(repeated[0], repeated[3]) + + @TimedCall(seconds=180, daemon=False) + def test_timing(self): + self.train_and_infer(idx=3) if __name__ == "__main__": diff --git a/tests/test_integration_workflows.py b/tests/test_integration_workflows.py index 165a7d6d0d..5d2bec1676 100644 --- a/tests/test_integration_workflows.py +++ b/tests/test_integration_workflows.py @@ -253,54 +253,47 @@ def tearDown(self): set_determinism(seed=None) shutil.rmtree(self.data_dir) + def train_and_infer(self, idx=0): + results = [] + set_determinism(seed=0) + best_metric = run_training_test(self.data_dir, device=self.device, amp=(idx == 2)) + model_file = sorted(glob(os.path.join(self.data_dir, "net_key_metric*.pt")))[-1] + infer_metric = run_inference_test(self.data_dir, model_file, device=self.device, amp=(idx == 2)) + + print("best metric", best_metric) + print("infer metric", infer_metric) + if idx == 2: + self.assertTrue(test_integration_value(TASK, key="best_metric_2", data=best_metric, rtol=1e-2)) + else: + self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-2)) + # check inference properties + if idx == 2: + self.assertTrue(test_integration_value(TASK, key="infer_metric_2", data=infer_metric, rtol=1e-2)) + else: + self.assertTrue(test_integration_value(TASK, key="infer_metric", data=infer_metric, rtol=1e-2)) + results.append(best_metric) + results.append(infer_metric) + output_files = sorted(glob(os.path.join(self.data_dir, "img*", "*.nii.gz"))) + for output in output_files: + ave = np.mean(nib.load(output).get_fdata()) + results.append(ave) + if idx == 2: + self.assertTrue(test_integration_value(TASK, key="output_sums_2", data=results[2:], rtol=1e-2)) + else: + self.assertTrue(test_integration_value(TASK, key="output_sums", data=results[2:], rtol=1e-2)) + return results + def test_training(self): repeated = [] test_rounds = 3 if monai.config.get_torch_version_tuple() >= (1, 6) else 2 for i in range(test_rounds): - set_determinism(seed=0) - - repeated.append([]) - best_metric = run_training_test(self.data_dir, device=self.device, amp=(i == 2)) - print("best metric", best_metric) - if i == 2: - self.assertTrue(test_integration_value(TASK, key="best_metric_2", data=best_metric, rtol=1e-2)) - else: - self.assertTrue(test_integration_value(TASK, key="best_metric", data=best_metric, rtol=1e-2)) - repeated[i].append(best_metric) - - model_file = sorted(glob(os.path.join(self.data_dir, "net_key_metric*.pt")))[-1] - infer_metric = run_inference_test(self.data_dir, model_file, device=self.device, amp=(i == 2)) - print("infer metric", infer_metric) - # check inference properties - if i == 2: - self.assertTrue(test_integration_value(TASK, key="infer_metric_2", data=infer_metric, rtol=1e-2)) - else: - self.assertTrue(test_integration_value(TASK, key="infer_metric", data=infer_metric, rtol=1e-2)) - repeated[i].append(infer_metric) - - output_files = sorted(glob(os.path.join(self.data_dir, "img*", "*.nii.gz"))) - for output in output_files: - ave = np.mean(nib.load(output).get_fdata()) - repeated[i].append(ave) - if i == 2: - self.assertTrue(test_integration_value(TASK, key="output_sums_2", data=repeated[i][2:], rtol=1e-2)) - else: - self.assertTrue(test_integration_value(TASK, key="output_sums", data=repeated[i][2:], rtol=1e-2)) + results = self.train_and_infer(idx=i) + repeated.append(results) np.testing.assert_allclose(repeated[0], repeated[1]) @TimedCall(seconds=200, skip_timing=not torch.cuda.is_available(), daemon=False) def test_timing(self): - set_determinism(seed=0) - - best_metric = run_training_test(self.data_dir, device=self.device, amp=True, num_workers=4) - print("best metric", best_metric) - self.assertTrue(test_integration_value(TASK, key="best_metric_2", data=best_metric, rtol=1e-2)) - - model_file = sorted(glob(os.path.join(self.data_dir, "net_key_metric*.pt")))[-1] - infer_metric = run_inference_test(self.data_dir, model_file, device=self.device, amp=True, num_workers=4) - print("infer metric", infer_metric) - # check inference properties - self.assertTrue(test_integration_value(TASK, key="infer_metric_2", data=infer_metric, rtol=1e-2)) + self.train_and_infer(idx=2) if __name__ == "__main__": From ba88abb48caadb278c27b308deb406b36881f88b Mon Sep 17 00:00:00 2001 From: Wenqi Li Date: Wed, 2 Dec 2020 08:15:11 -0500 Subject: [PATCH 7/7] update integration tests Signed-off-by: Wenqi Li --- tests/test_integration_classification_2d.py | 2 +- tests/test_integration_workflows.py | 13 ++++++++++++- tests/test_iterable_dataset.py | 6 ++++-- tests/utils.py | 4 ++-- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/tests/test_integration_classification_2d.py b/tests/test_integration_classification_2d.py index 8713965a8b..b9a458ac86 100644 --- a/tests/test_integration_classification_2d.py +++ b/tests/test_integration_classification_2d.py @@ -243,7 +243,7 @@ def test_training(self): repeated.append(results) np.testing.assert_allclose(repeated[0], repeated[1]) - @TimedCall(seconds=400, skip_timing=not torch.cuda.is_available(), daemon=False) + @TimedCall(seconds=500, skip_timing=not torch.cuda.is_available(), daemon=False) def test_timing(self): self.train_and_infer() diff --git a/tests/test_integration_workflows.py b/tests/test_integration_workflows.py index 5d2bec1676..8d117ddb5a 100644 --- a/tests/test_integration_workflows.py +++ b/tests/test_integration_workflows.py @@ -15,6 +15,7 @@ import sys import tempfile import unittest +import warnings from glob import glob import nibabel as nib @@ -281,6 +282,16 @@ def train_and_infer(self, idx=0): self.assertTrue(test_integration_value(TASK, key="output_sums_2", data=results[2:], rtol=1e-2)) else: self.assertTrue(test_integration_value(TASK, key="output_sums", data=results[2:], rtol=1e-2)) + try: + os.remove(model_file) + except Exception as e: + warnings.warn(f"Fail to remove {model_file}: {e}.") + if torch.cuda.is_available(): + try: + torch.cuda.empty_cache() + except Exception: + pass + return results def test_training(self): @@ -291,7 +302,7 @@ def test_training(self): repeated.append(results) np.testing.assert_allclose(repeated[0], repeated[1]) - @TimedCall(seconds=200, skip_timing=not torch.cuda.is_available(), daemon=False) + @TimedCall(seconds=300, skip_timing=not torch.cuda.is_available(), daemon=False) def test_timing(self): self.train_and_infer(idx=2) diff --git a/tests/test_iterable_dataset.py b/tests/test_iterable_dataset.py index 53f1267a2a..5a7345905e 100644 --- a/tests/test_iterable_dataset.py +++ b/tests/test_iterable_dataset.py @@ -38,12 +38,14 @@ def __next__(self): data = None # support multi-process access to the database lock.acquire() + count = 0 with open(self.dbpath) as f: count = json.load(f)["count"] if count > 0: data = self.data[count - 1] - with open(self.dbpath, "w") as f: - json.dump({"count": count - 1}, f) + if count > 0: + with open(self.dbpath, "w") as f: + json.dump({"count": count - 1}, f) lock.release() if count == 0: diff --git a/tests/utils.py b/tests/utils.py index ffbd8774b7..3ab73a4fcd 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -423,7 +423,7 @@ def setUp(self): self.segn = torch.tensor(self.segn) -def test_script_save(net, *inputs, eval_nets=True, device=None): +def test_script_save(net, *inputs, eval_nets=True, device=None, rtol=1e-4): """ Test the ability to save `net` as a Torchscript object, reload it, and apply inference. The value `inputs` is forward-passed through the original and loaded copy of the network and their results returned. Both `net` and its @@ -468,7 +468,7 @@ def test_script_save(net, *inputs, eval_nets=True, device=None): np.testing.assert_allclose( r1.detach().cpu().numpy(), r2.detach().cpu().numpy(), - rtol=1e-5, + rtol=rtol, atol=0, err_msg=f"failed on comparison number: {i}", )