diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index 27120b8dcc..69cb25cc0e 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -13,12 +13,12 @@ import importlib import os +import re import shutil import subprocess import sys import time import warnings -from collections.abc import Mapping from copy import deepcopy from pathlib import Path from tempfile import TemporaryDirectory @@ -30,10 +30,17 @@ from monai.apps import download_and_extract from monai.apps.utils import get_logger from monai.auto3dseg.algo_gen import Algo, AlgoGen -from monai.auto3dseg.utils import algo_to_pickle +from monai.auto3dseg.utils import ( + _prepare_cmd_bcprun, + _prepare_cmd_default, + _prepare_cmd_torchrun, + _run_cmd_bcprun, + _run_cmd_torchrun, + algo_to_pickle, +) from monai.bundle.config_parser import ConfigParser from monai.config import PathLike -from monai.utils import ensure_tuple, run_cmd +from monai.utils import ensure_tuple, look_up_option, run_cmd from monai.utils.enums import AlgoKeys logger = get_logger(module_name=__name__) @@ -88,7 +95,7 @@ def __init__(self, template_path: PathLike): "n_devices": int(torch.cuda.device_count()), "NUM_NODES": int(os.environ.get("NUM_NODES", 1)), "MN_START_METHOD": os.environ.get("MN_START_METHOD", "bcprun"), - "CMD_PREFIX": os.environ.get("CMD_PREFIX"), # type: ignore + "CMD_PREFIX": os.environ.get("CMD_PREFIX", ""), # type: ignore } def pre_check_skip_algo(self, skip_bundlegen: bool = False, skip_info: str = "") -> tuple[bool, str]: @@ -175,36 +182,45 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: train_py = os.path.join(self.output_path, "scripts", "train.py") config_dir = os.path.join(self.output_path, "configs") + config_files = [] if os.path.isdir(config_dir): - base_cmd = "" for file in sorted(os.listdir(config_dir)): - if not (file.endswith("yaml") or file.endswith("json")): - continue - base_cmd += f"{train_py} run --config_file=" if len(base_cmd) == 0 else "," - # Python Fire may be confused by single-quoted WindowsPath - config_yaml = Path(os.path.join(config_dir, file)).as_posix() - base_cmd += f"'{config_yaml}'" - cmd: str | None = self.device_setting["CMD_PREFIX"] # type: ignore - # make sure cmd end with a space - if cmd is not None and not cmd.endswith(" "): - cmd += " " - if (int(self.device_setting["NUM_NODES"]) > 1 and self.device_setting["MN_START_METHOD"] == "bcprun") or ( - int(self.device_setting["NUM_NODES"]) <= 1 and int(self.device_setting["n_devices"]) <= 1 - ): - cmd = "python " if cmd is None else cmd - elif int(self.device_setting["NUM_NODES"]) > 1: - raise NotImplementedError( - f"{self.device_setting['MN_START_METHOD']} is not supported yet." - "Try modify BundleAlgo._create_cmd for your cluster." + if file.endswith("yaml") or file.endswith("json"): + # Python Fire may be confused by single-quoted WindowsPath + config_files.append(Path(os.path.join(config_dir, file)).as_posix()) + + if int(self.device_setting["NUM_NODES"]) > 1: + # multi-node command + # only bcprun is supported for now + try: + look_up_option(self.device_setting["MN_START_METHOD"], ["bcprun"]) + except ValueError as err: + raise NotImplementedError( + f"{self.device_setting['MN_START_METHOD']} is not supported yet." + "Try modify BundleAlgo._create_cmd for your cluster." + ) from err + + return ( + _prepare_cmd_bcprun( + f"{train_py} run", + cmd_prefix=f"{self.device_setting['CMD_PREFIX']}", + config_file=config_files, + **params, + ), + "", ) + elif int(self.device_setting["n_devices"]) > 1: + return _prepare_cmd_torchrun(f"{train_py} run", config_file=config_files, **params), "" else: - if cmd is None: - cmd = f"torchrun --nnodes={1:d} --nproc_per_node={self.device_setting['n_devices']:d} " - cmd += base_cmd - if params and isinstance(params, Mapping): - for k, v in params.items(): - cmd += f" --{k}={v}" - return cmd, "" + return ( + _prepare_cmd_default( + f"{train_py} run", + cmd_prefix=f"{self.device_setting['CMD_PREFIX']}", + config_file=config_files, + **params, + ), + "", + ) def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProcess: """ @@ -216,34 +232,26 @@ def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProc ps_environ = os.environ.copy() ps_environ["CUDA_VISIBLE_DEVICES"] = str(self.device_setting["CUDA_VISIBLE_DEVICES"]) + + # delete pattern "VAR=VALUE" at the beginning of the string, with optional leading/trailing whitespaces + cmd = re.sub(r"^\s*\w+=.*?\s+", "", cmd) + if int(self.device_setting["NUM_NODES"]) > 1: - if self.device_setting["MN_START_METHOD"] == "bcprun": - cmd_list = [ - "bcprun", - "-n", - str(self.device_setting["NUM_NODES"]), - "-p", - str(self.device_setting["n_devices"]), - "-c", - cmd, - ] - else: + try: + look_up_option(self.device_setting["MN_START_METHOD"], ["bcprun"]) + except ValueError as err: raise NotImplementedError( - f"{self.device_setting['MN_START_METHOD']} is not supported yet. " + f"{self.device_setting['MN_START_METHOD']} is not supported yet." "Try modify BundleAlgo._run_cmd for your cluster." - ) - else: - cmd_list = cmd.split() - - _idx = 0 - for _idx, c in enumerate(cmd_list): - if "=" not in c: # remove variable assignments before the command such as "OMP_NUM_THREADS=1" - break - cmd_list = cmd_list[_idx:] + ) from err - logger.info(f"Launching: {' '.join(cmd_list)}") - - return run_cmd(cmd_list, env=ps_environ, check=True) + return _run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"]) + elif int(self.device_setting["n_devices"]) > 1: + return _run_cmd_torchrun( + cmd, nnodes=1, nproc_per_node=self.device_setting["n_devices"], env=ps_environ, check=True + ) + else: + return run_cmd(cmd.split(), env=ps_environ, check=True) def train( self, train_params: None | dict = None, device_setting: None | dict = None diff --git a/monai/apps/auto3dseg/ensemble_builder.py b/monai/apps/auto3dseg/ensemble_builder.py index afb15d5d3e..ce2c2895d6 100644 --- a/monai/apps/auto3dseg/ensemble_builder.py +++ b/monai/apps/auto3dseg/ensemble_builder.py @@ -26,13 +26,19 @@ from monai.apps.auto3dseg.utils import get_name_from_algo_id, import_bundle_algo_history from monai.apps.utils import get_logger from monai.auto3dseg import concat_val_to_np -from monai.auto3dseg.utils import datafold_read +from monai.auto3dseg.utils import ( + _prepare_cmd_bcprun, + _prepare_cmd_torchrun, + _run_cmd_bcprun, + _run_cmd_torchrun, + datafold_read, +) from monai.bundle import ConfigParser from monai.data import partition_dataset from monai.transforms import MeanEnsemble, SaveImage, VoteEnsemble from monai.utils import RankFilter, deprecated_arg from monai.utils.enums import AlgoKeys -from monai.utils.misc import check_kwargs_exist_in_class_init, prob2class, run_cmd +from monai.utils.misc import check_kwargs_exist_in_class_init, prob2class from monai.utils.module import look_up_option, optional_import tqdm, has_tqdm = optional_import("tqdm", name="tqdm") @@ -642,9 +648,6 @@ def _create_cmd(self) -> None: # define env for subprocess ps_environ = os.environ.copy() ps_environ["CUDA_VISIBLE_DEVICES"] = str(self.device_setting["CUDA_VISIBLE_DEVICES"]) - cmd: str | None = self.device_setting["CMD_PREFIX"] # type: ignore - if cmd is not None and not str(cmd).endswith(" "): - cmd += " " if int(self.device_setting["NUM_NODES"]) > 1: if self.device_setting["MN_START_METHOD"] != "bcprun": raise NotImplementedError( @@ -652,24 +655,13 @@ def _create_cmd(self) -> None: "Try modify EnsembleRunner._create_cmd for your cluster." ) logger.info(f"Ensembling on {self.device_setting['NUM_NODES']} nodes!") - cmd = "python " if cmd is None else cmd - cmd = f"{cmd} -m {base_cmd}" - cmd_list = [ - "bcprun", - "-n", - str(self.device_setting["NUM_NODES"]), - "-p", - str(self.device_setting["n_devices"]), - "-c", - cmd, - ] + cmd = _prepare_cmd_bcprun("-m " + base_cmd, cmd_prefix=f"{self.device_setting['CMD_PREFIX']}") + _run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"]) else: logger.info(f"Ensembling using {self.device_setting['n_devices']} GPU!") - if cmd is None: - cmd = f"torchrun --nnodes={1:d} --nproc_per_node={self.device_setting['n_devices']:d} " - cmd = f"{cmd} -m {base_cmd}" - cmd_list = cmd.split() - - run_cmd(cmd_list, env=ps_environ, check=True) + cmd = _prepare_cmd_torchrun("-m " + base_cmd) + _run_cmd_torchrun( + cmd, nnodes=1, nproc_per_node=self.device_setting["n_devices"], env=ps_environ, check=True + ) return diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index 2f5e1b26eb..0da4ee9f26 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -14,6 +14,7 @@ import logging import os import pickle +import subprocess import sys from copy import deepcopy from numbers import Number @@ -28,7 +29,7 @@ from monai.config import PathLike from monai.data.meta_tensor import MetaTensor from monai.transforms import CropForeground, ToCupy -from monai.utils import min_version, optional_import +from monai.utils import min_version, optional_import, run_cmd __all__ = [ "get_foreground_image", @@ -372,3 +373,151 @@ def algo_from_pickle(pkl_filename: str, template_path: PathLike | None = None, * algo_meta_data.update({k: v}) return algo, algo_meta_data + + +def list_to_python_fire_arg_str(args: list) -> str: + """ + Convert a list of arguments to a string that can be used in python-fire. + + Args: + args: the list of arguments. + + Returns: + the string that can be used in python-fire. + """ + args_str = ",".join([str(arg) for arg in args]) + return f"'{args_str}'" + + +def check_and_set_optional_args(params: dict) -> str: + """ """ + cmd_mod_opt = "" + for k, v in params.items(): + if isinstance(v, dict): + raise ValueError("Nested dict is not supported.") + elif isinstance(v, list): + v = list_to_python_fire_arg_str(v) + cmd_mod_opt += f" --{k} {str(v)}" + return cmd_mod_opt + + +def _prepare_cmd_default(cmd: str, cmd_prefix: str | None = None, **kwargs: Any) -> str: + """ + Prepare the command for subprocess to run the script with the given arguments. + + Args: + cmd: the command or script to run in the distributed job. + cmd_prefix: the command prefix to run the script, e.g., "python", "python -m", "python3", "/opt/conda/bin/python3.8 ". + kwargs: the keyword arguments to be passed to the script. + + Returns: + the command to run with ``subprocess``. + + Examples: + To prepare a subprocess command + "python train.py run -k --config 'a,b'", the function can be called as + - _prepare_cmd_default("train.py run -k", config=['a','b']) + - _prepare_cmd_default("train.py run -k --config 'a,b'") + + """ + params = kwargs.copy() + + cmd_prefix = cmd_prefix or "python" + + if not cmd_prefix.endswith(" "): + cmd_prefix += " " # ensure a space after the command prefix so that the script can be appended + + return cmd_prefix + cmd + check_and_set_optional_args(params) + + +def _prepare_cmd_torchrun(cmd: str, **kwargs: Any) -> str: + """ + Prepare the command for multi-gpu/multi-node job execution using torchrun. + + Args: + cmd: the command or script to run in the distributed job. + kwargs: the keyword arguments to be passed to the script. + + Returns: + the command to append to ``torchrun`` + + Examples: + For command "torchrun --nnodes=1 --nproc_per_node=8 train.py run -k --config 'a,b'", + it only prepares command after the torchrun arguments, i.e., "train.py run -k --config 'a,b'". + The function can be called as + - _prepare_cmd_torchrun("train.py run -k", config=['a','b']) + - _prepare_cmd_torchrun("train.py run -k --config 'a,b'") + """ + params = kwargs.copy() + return cmd + check_and_set_optional_args(params) + + +def _prepare_cmd_bcprun(cmd: str, cmd_prefix: str | None = None, **kwargs: Any) -> str: + """ + Prepare the command for distributed job running using bcprun. + + Args: + script: the script to run in the distributed job. + cmd_prefix: the command prefix to run the script, e.g., "python". + kwargs: the keyword arguments to be passed to the script. + + Returns: + The command to run the script in the distributed job. + + Examples: + For command "bcprun -n 2 -p 8 -c python train.py run -k --config 'a,b'", + it only prepares command after the bcprun arguments, i.e., "train.py run -k --config 'a,b'". + the function can be called as + - _prepare_cmd_bcprun("train.py run -k", config=['a','b'], n=2, p=8) + - _prepare_cmd_bcprun("train.py run -k --config 'a,b'", n=2, p=8) + """ + + return _prepare_cmd_default(cmd, cmd_prefix=cmd_prefix, **kwargs) + + +def _run_cmd_torchrun(cmd: str, **kwargs: Any) -> subprocess.CompletedProcess: + """ + Run the command with torchrun. + + Args: + cmd: the command to run. Typically it is prepared by ``_prepare_cmd_torchrun``. + kwargs: the keyword arguments to be passed to the ``torchrun``. + + Return: + the return code of the subprocess command. + """ + params = kwargs.copy() + + cmd_list = cmd.split() + + # append arguments to the command list + torchrun_list = ["torchrun"] + required_args = ["nnodes", "nproc_per_node"] + for arg in required_args: + if arg not in params: + raise ValueError(f"Missing required argument {arg} for torchrun.") + torchrun_list += [f"--{arg}", str(params.pop(arg))] + torchrun_list += cmd_list + return run_cmd(torchrun_list, **params) + + +def _run_cmd_bcprun(cmd: str, **kwargs: Any) -> subprocess.CompletedProcess: + """ + Run the command with bcprun. + + Args: + cmd: the command to run. Typically it is prepared by ``_prepare_cmd_bcprun``. + kwargs: the keyword arguments to be passed to the ``bcprun``. + + Returns: + the return code of the subprocess command. + """ + params = kwargs.copy() + cmd_list = ["bcprun"] + required_args = ["n", "p"] + for arg in required_args: + if arg not in params: + raise ValueError(f"Missing required argument {arg} for bcprun.") + cmd_list += [f"-{arg}", str(params.pop(arg))] + cmd_list.extend(["-c", cmd]) + return run_cmd(cmd_list, **params)