diff --git a/.github/workflows/python_installation.yml b/.github/workflows/python_installation.yml index 9116de7943..8eddb84a83 100644 --- a/.github/workflows/python_installation.yml +++ b/.github/workflows/python_installation.yml @@ -34,6 +34,11 @@ jobs: conda config --append channels conda-forge conda config --set show_channel_urls true conda install lhapdf pandoc + - name: Install MongoDB for parallel hyperopts + shell: bash -l {0} + run: | + conda install mongodb + mongod --version - name: Install nnpdf with testing and qed extras shell: bash -l {0} run: | diff --git a/conda-recipe/meta.yaml b/conda-recipe/meta.yaml index fc09fc9d08..f8a5f1a573 100644 --- a/conda-recipe/meta.yaml +++ b/conda-recipe/meta.yaml @@ -26,6 +26,8 @@ requirements: - psutil # to ensure n3fit affinity is with the right processors - blas==1.0 *mkl* # [osx] # Host's blas is mkl, force also runtime blas to be - hyperopt + - mongodb + - pymongo <4 - seaborn - lhapdf - sqlite diff --git a/doc/sphinx/source/n3fit/hyperopt.rst b/doc/sphinx/source/n3fit/hyperopt.rst index 83c302216d..7c7ba903e9 100644 --- a/doc/sphinx/source/n3fit/hyperopt.rst +++ b/doc/sphinx/source/n3fit/hyperopt.rst @@ -484,3 +484,38 @@ To achieve this, you can use the ``--restart`` option within the ``n3fit`` comma The above command example is effective when the number of saved trials in the ``test_run/nnfit/replica_1/tries.pkl`` is less than ``20``. If there are ``20`` or more saved trials, ``n3fit`` will simply terminate, displaying the best results. + + +Running hyperoptimizations in parallel with MongoDB +--------------------------------------------------- + +It is possible to run hyperoptimization scans in parallel using `MongoDB `_. +This functionality is provided by the :class:`~n3fit.hyper_optimization.mongofiletrials.MongoFileTrials` class, +which extends the capabilities of `hyperopt `_'s `MongoTrials` and enables the +simultaneous evaluation of multiple trials. + +To run a parallelized hyperopt search, use the following command: + +.. code-block:: bash + + n3fit hyper-quickcard.yml 1 -r N_replicas --hyperopt N_trials --parallel-hyperopt --num-mongo-workers N + +Here, ``N`` represents the number of MongoDB workers you wish to launch in parallel. +Each mongo worker handles one trial in Hyperopt. So, launching more workers allows for the simultaneous calculation of a greater number of trials. +Note that there is no need to manually launch MongoDB databases or mongo workers prior to using ``n3fit``, +as the ``mongod`` and ``hyperopt-mongo-worker`` commands are automatically executed +by :meth:`~n3fit.hyper_optimization.mongofiletrials.MongodRunner.start` and +:meth:`~n3fit.hyper_optimization.mongofiletrials.MongoFileTrials.start_mongo_workers` methods, respectivelly. +By default, the ``host`` and ``port`` arguments are set to ``localhost`` and ``27017``. The database is named ``hyperopt-db-output_name``, where +``output_name`` is set to the name of the runcard. If the ``n3fit -o OUTPUT`` option is provided, ``output_name`` is set to ``OUTPUT``, with the database being referred to as ``hyperopt-db-OUTPUT``. +If necessary, it is possible to modify all the above settings using the ``n3fit --db-host`` , ``n3fit --db-port`` and ``n3fit --db-name`` options. + +To resume a hyperopt experiment, add the ``--restart`` option to the ``n3fit`` command: + +.. code-block:: bash + + n3fit hyper-quickcard.yml 1 -r N_replicas --hyperopt N_trials --parallel-hyperopt --num-mongo-workers N --restart + +Note that, unlike in serial execution, parallel hyperoptimization runs do not generate ``tries.pkl`` files. +Instead, MongoDB databases are saved as ``hyperopt-db-output_name.tar.gz`` files inside ``replica_path`` directory. +These are conveniently extracted for reuse in restart runs. diff --git a/n3fit/src/n3fit/backends/__init__.py b/n3fit/src/n3fit/backends/__init__.py index 3676dd25d7..e48c4a856f 100644 --- a/n3fit/src/n3fit/backends/__init__.py +++ b/n3fit/src/n3fit/backends/__init__.py @@ -15,6 +15,7 @@ ) from n3fit.backends.keras_backend.internal_state import ( clear_backend_state, + get_physical_gpus, set_eager, set_initial_state, ) diff --git a/n3fit/src/n3fit/backends/keras_backend/internal_state.py b/n3fit/src/n3fit/backends/keras_backend/internal_state.py index 6cfc921c68..e818716940 100644 --- a/n3fit/src/n3fit/backends/keras_backend/internal_state.py +++ b/n3fit/src/n3fit/backends/keras_backend/internal_state.py @@ -143,3 +143,14 @@ def set_initial_state(debug=False, external_seed=None, max_cores=None, double_pr # Once again, if in debug mode or external_seed set, set also the TF seed if debug or external_seed: tf.random.set_seed(use_seed) + + +def get_physical_gpus(): + """ + Retrieve a list of all physical GPU devices available in the system. + + Returns + ------- + list: A list of TensorFlow physical devices of type 'GPU'. + """ + return tf.config.list_physical_devices('GPU') diff --git a/n3fit/src/n3fit/hyper_optimization/hyper_scan.py b/n3fit/src/n3fit/hyper_optimization/hyper_scan.py index bccb67bd5f..58681c687f 100644 --- a/n3fit/src/n3fit/hyper_optimization/hyper_scan.py +++ b/n3fit/src/n3fit/hyper_optimization/hyper_scan.py @@ -14,7 +14,7 @@ """ import copy import logging -from typing import Callable +import os import hyperopt from hyperopt.pyll.base import scope @@ -22,6 +22,7 @@ from n3fit.backends import MetaLayer, MetaModel from n3fit.hyper_optimization.filetrials import FileTrials +from n3fit.hyper_optimization.mongofiletrials import MongodRunner, MongoFileTrials log = logging.getLogger(__name__) @@ -125,28 +126,61 @@ def hyper_scan_wrapper(replica_path_set, model_trainer, hyperscanner, max_evals= """ # Tell the trainer we are doing hpyeropt model_trainer.set_hyperopt(True, keys=hyperscanner.hyper_keys) + + if hyperscanner.restart_hyperopt: + # For parallel hyperopt restarts, extract the database tar file + if hyperscanner.parallel_hyperopt: + tar_file_to_extract = f"{replica_path_set}/{hyperscanner.db_name}.tar.gz" + log.info("Restarting hyperopt run using the MongoDB database %s", tar_file_to_extract) + MongoFileTrials.extract_mongodb_database(tar_file_to_extract, path=os.getcwd()) + else: + # For sequential hyperopt restarts, reset the state of `FileTrials` saved in the pickle file + pickle_file_to_load = f"{replica_path_set}/tries.pkl" + log.info("Restarting hyperopt run using the pickle file %s", pickle_file_to_load) + trials = FileTrials.from_pkl(pickle_file_to_load) + + if hyperscanner.parallel_hyperopt: + # start MongoDB database by launching `mongod` + hyperscanner.mongod_runner.ensure_database_dir_exists() + mongod = hyperscanner.mongod_runner.start() + # Generate the trials object - trials = FileTrials(replica_path_set, parameters=hyperscanner.as_dict()) + if hyperscanner.parallel_hyperopt: + # Instantiate `MongoFileTrials` + # Mongo database should have already been initiated at this point + trials = MongoFileTrials( + replica_path_set, + db_host=hyperscanner.db_host, + db_port=hyperscanner.db_port, + db_name=hyperscanner.db_name, + num_workers=hyperscanner.num_mongo_workers, + parameters=hyperscanner.as_dict(), + ) + else: + # Instantiate `FileTrials` + trials = FileTrials(replica_path_set, parameters=hyperscanner.as_dict()) + # Initialize seed for hyperopt trials.rstate = np.random.default_rng(HYPEROPT_SEED) - # For restarts, reset the state of `FileTrials` saved in the pickle file - if hyperscanner.restart_hyperopt: - pickle_file_to_load = f"{replica_path_set}/tries.pkl" - log.info("Restarting hyperopt run using the pickle file %s", pickle_file_to_load) - trials = FileTrials.from_pkl(pickle_file_to_load) - - # Perform the scan - best = hyperopt.fmin( + # Call to hyperopt.fmin + fmin_args = dict( fn=model_trainer.hyperparametrizable, space=hyperscanner.as_dict(), algo=hyperopt.tpe.suggest, max_evals=max_evals, - show_progressbar=False, trials=trials, rstate=trials.rstate, - trials_save_file=trials.pkl_file, ) + if hyperscanner.parallel_hyperopt: + trials.start_mongo_workers() + best = hyperopt.fmin(**fmin_args, show_progressbar=True, max_queue_len=trials.num_workers) + trials.stop_mongo_workers() + # stop mongod command and compress database + hyperscanner.mongod_runner.stop(mongod) + trials.compress_mongodb_database() + else: + best = hyperopt.fmin(**fmin_args, show_progressbar=False, trials_save_file=trials.pkl_file) return hyperscanner.space_eval(best) @@ -205,6 +239,20 @@ def __init__(self, parameters, sampling_dict, steps=5): restart_config = sampling_dict.get("restart") self.restart_hyperopt = True if restart_config else False + # adding extra options for parallel execution + parallel_config = sampling_dict.get("parallel") + self.parallel_hyperopt = True if parallel_config else False + + # setting up MondoDB options + if self.parallel_hyperopt: + # add output_path to db name to avoid conflicts + db_name = f'{sampling_dict.get("db_name")}-{sampling_dict.get("output_path")}' + self.db_host = sampling_dict.get("db_host") + self.db_port = sampling_dict.get("db_port") + self.db_name = db_name + self.num_mongo_workers = sampling_dict.get("num_mongo_workers") + self.mongod_runner = MongodRunner(self.db_name, self.db_port) + self.hyper_keys = set([]) if "parameters" in sampling_dict: diff --git a/n3fit/src/n3fit/hyper_optimization/mongofiletrials.py b/n3fit/src/n3fit/hyper_optimization/mongofiletrials.py new file mode 100644 index 0000000000..c5e2d2cbd0 --- /dev/null +++ b/n3fit/src/n3fit/hyper_optimization/mongofiletrials.py @@ -0,0 +1,323 @@ +""" + Hyperopt trial object for parallel hyperoptimization with MongoDB. + Data are fetched from MongoDB databases and stored in the form of json and tar.gz files within the nnfit folder. +""" +import json +import logging +import os +import subprocess +import tarfile + +from bson import SON, ObjectId +from hyperopt.mongoexp import MongoTrials + +from n3fit.backends import get_physical_gpus +from n3fit.hyper_optimization.filetrials import space_eval_trial + +log = logging.getLogger(__name__) + + +def convert_bson_to_dict(obj): + """ + Recursively convert a BSON object to a standard Python dictionary. + + This function is particularly useful for converting MongoDB query results, + which may contain BSON types like ObjectId and SON, into a more manageable + dictionary format. + + Parameters + ---------- + obj : dict or bson.SON or list or any + The object to convert. Can be a BSON object (like SON), a dictionary + containing BSON types, a list of such objects, or any other type. + + Returns + ------- + dict or list or any + A Python dictionary with all BSON types converted to standard Python + types (e.g., ObjectId converted to string). If the input is a list, + returns a list of converted elements. For other types, returns the + object as is. + + Examples + -------- + >>> from bson import ObjectId, SON + >>> sample_son = SON([('_id', ObjectId('507f1f77bcf86cd799439011')), ('name', 'John Doe')]) + >>> convert_bson_to_dict(sample_son) + {'_id': '507f1f77bcf86cd799439011', 'name': 'John Doe'} + + >>> sample_list = [SON([('_id', ObjectId('507f1f77bcf86cd799439011')), ('name', 'John Doe')]), {'age': 30}] + >>> convert_bson_to_dict(sample_list) + [{'_id': '507f1f77bcf86cd799439011', 'name': 'John Doe'}, {'age': 30}] + """ + if isinstance(obj, (SON, dict)): + return {k: convert_bson_to_dict(v) for k, v in obj.items()} + if isinstance(obj, ObjectId): + return str(obj) # or just return None if you don't need the ObjectId + if isinstance(obj, list): + return [convert_bson_to_dict(v) for v in obj] + return obj + + +class MongodRunner: + """Class to manage a MongoDB instance. + + This class is responsible for automatically creating and managing a MongoDB database + using the `mongod` command. It allows for starting and stopping a MongoDB instance + programmatically. + + Parameters + ---------- + db_port: int + MongoDB database connection port. Defaults to 27017. + db_name: str + MongoDB database name. Defaults to "hyperopt-db". + """ + + def __init__(self, db_name="hyperopt-db", db_port=27017): + self.db_name = db_name + self.db_port = db_port + + def ensure_database_dir_exists(self): + """Check if MongoDB database directory exists.""" + if not os.path.exists(f"{self.db_name}"): + log.info(f"Creating MongoDB database dir {self.db_name}") + os.makedirs(self.db_name, exist_ok=True) + + def start(self): + """Starts the MongoDB instance via `mongod` command.""" + args = [ + "mongod", + "-quiet", + "--dbpath", + self.db_name, + "--port", + str(self.db_port), + "--directoryperdb", + ] + try: + mongod = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + log.info(f"Started MongoDB database {self.db_name}") + return mongod + except OSError as err: + msg = f"Failed to execute {args}. Make sure you have MongoDB installed." + raise EnvironmentError(msg) from err + + def stop(self, mongod): + """Stops `mongod` command.""" + try: + mongod.terminate() + mongod.wait() + log.info(f"Stopped mongod") + except Exception as err: + log.error(f"Failed to stop mongod: {err}") + + +class MongoFileTrials(MongoTrials): + """ + MongoDB implementation of :class:`n3fit.hyper_optimization.filetrials.FileTrials`. + + Parameters + ---------- + replica_path: path + Replica folder as generated by n3fit. + db_host: str + MongoDB database connection host. Defaults to "localhost". + db_port: int + MongoDB database connection port. Defaults to 27017. + db_name: str + MongoDB database name. Defaults to "hyperopt-db". + num_workers: int + Number of MongoDB workers to be initiated concurrently. Defaults to 1. + parameters: dict + Dictionary of parameters on which we are doing hyperoptimization. Default to None. + store_trial: bool + If True, store data into json file. Default to True. + """ + + def __init__( + self, + replica_path, + db_host="localhost", + db_port=27017, + db_name="hyperopt-db", + num_workers=1, + parameters=None, + *args, + **kwargs, + ): + self.db_host = db_host + self.db_port = str(db_port) + self.db_name = db_name + self.num_workers = num_workers + self.mongotrials_arg = ( + f"mongo://{self.db_host}:{self.db_port}/{self._process_db_name(self.db_name)}/jobs" + ) + self.workers = [] + + self._store_trial = False + self._json_file = replica_path / "tries.json" + self.database_tar_file = replica_path / f"{self.db_name}.tar.gz" + self._parameters = parameters + self._rstate = None + self._dynamic_trials = [] + + super().__init__(self.mongotrials_arg, *args, **kwargs) + + def _process_db_name(self, db_name): + """Checks if db_name contains a slash, indicating a "directory/db" format.""" + if '/' in db_name: + # Split the string by '/' and take the last part as the db name + db_name_parts = db_name.split('/') + db_name = db_name_parts[-1] + return db_name + + @property + def rstate(self): + """Returns the rstate attribute; see :class:`n3fit.hyper_optimization.filetrials.FileTrials`.""" + return self._rstate + + @rstate.setter + def rstate(self, random_generator): + """Sets the rstate attribute; see :class:`n3fit.hyper_optimization.filetrials.FileTrials`.""" + self._rstate = random_generator + + def _set_dynamic_trials(self): + """Converts self._trials to a dictionary and stores it in self._dynamic_trials.""" + self._dynamic_trials = [convert_bson_to_dict(item) for item in self._trials] + + def refresh(self): + """Fetches data from mongo database and save to a json file.""" + super().refresh() + + # convert BSON object to a dictionary + self._set_dynamic_trials() + + # write json to disk + if self._store_trial: + log.info("Storing scan in %s", self._json_file) + local_trials = [] + for idx, t in enumerate(self._dynamic_trials): + local_trials.append(t) + local_trials[idx]["misc"]["space_vals"] = space_eval_trial(self._parameters, t) + + all_to_str = json.dumps(local_trials, default=str) + with open(self._json_file, "w") as f: + f.write(all_to_str) + + # like in `FileTrials` the two methods below are implemented to avoid writing to the database twice + def new_trial_ids(self, n): + self._store_trial = False + return super().new_trial_ids(n) + + def _insert_trial_docs(self, docs): + self._store_trial = True + return super()._insert_trial_docs(docs) + + def start_mongo_workers( + self, + workdir=None, + exp_key=None, + poll_interval=0.1, + no_subprocesses=False, + max_consecutive_failures=10, + reserve_timeout=600, + ): + """Initiates all mongo workers simultaneously.""" + # get the number of gpu cards, if any + gpus_all_physical_list = get_physical_gpus() + num_gpus_available = len(gpus_all_physical_list) + if not num_gpus_available: + log.warning("No GPUs found in the system.") + + # launch mongo workers + for i in range(self.num_workers): + # construct the command to start a hyperopt-mongo-worker + args = [ + "hyperopt-mongo-worker", + "--mongo", + f"{self.db_host}:{self.db_port}/{self.db_name}", + ] + if workdir: + args.extend(["--workdir", workdir]) + if exp_key: + args.extend(["--exp-key", exp_key]) + if poll_interval: + args.extend(["--poll-interval", str(poll_interval)]) + if max_consecutive_failures: + args.extend(["--max-consecutive-failures", str(max_consecutive_failures)]) + if reserve_timeout: + args.extend(["--reserve-timeout", str(reserve_timeout)]) + if no_subprocesses: + args.append("--no-subprocesses") + + # start the worker as a subprocess + try: + my_env = os.environ.copy() + + if num_gpus_available: + # set CUDA_VISIBLE_DEVICES environment variable + # the GPU index assigned to each worker i is given by mod(i, num_gpus_available) + my_env["CUDA_VISIBLE_DEVICES"] = str(i % num_gpus_available) + # set tensorflow memory growth + my_env["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" + # avoid memory fragmentation issues? + # my_env["TF_GPU_ALLOCATOR"] = "cuda_malloc_async" + + # run mongo workers + # we could use stdout=subprocess.DEVNULL and stderr=subprocess.DEVNULL in Popen to suppress output info + worker = subprocess.Popen(args, env=my_env) + self.workers.append(worker) + log.info(f"Started mongo worker {i+1}/{self.num_workers}") + except OSError as err: + msg = f"Failed to execute {args}. Make sure you have MongoDB installed." + raise EnvironmentError(msg) from err + + def stop_mongo_workers(self): + """Terminates all active mongo workers.""" + for worker in self.workers: + try: + worker.terminate() + worker.wait() + log.info(f"Stopped mongo worker {self.workers.index(worker)+1}/{self.num_workers}") + except Exception as err: + log.error( + f"Failed to stop mongo worker {self.workers.index(worker)+1}/{self.num_workers}: {err}" + ) + + def compress_mongodb_database(self): + """Saves MongoDB database as tar file""" + # check if the database exist + if not os.path.exists(f"{self.db_name}"): + raise FileNotFoundError( + f"The MongoDB database directory '{self.db_name}' does not exist. " + "Ensure it has been initiated correctly and it is in your path." + ) + # create the tar.gz file + try: + log.info(f"Compressing MongoDB database into {self.database_tar_file}") + with tarfile.open(self.database_tar_file, "w:gz") as tar: + tar.add(self.db_name) + except tarfile.TarError as err: + raise RuntimeError(f"Error compressing the database: {err}") + + @staticmethod + def extract_mongodb_database(database_tar_file, path=os.getcwd()): + """Untar MongoDB database for use in restarts.""" + # check if the database tar file exist + if not os.path.exists(f"{database_tar_file}"): + raise FileNotFoundError( + f"The MongoDB database tar file '{database_tar_file}' does not exist." + ) + # check of the provided file is a tar type + if not tarfile.is_tarfile(database_tar_file): + raise tarfile.ReadError( + f"The file '{database_tar_file}' provided is not a tar file type." + ) + # extract tar file + try: + log.info(f"Extracting MongoDB database {database_tar_file} to {path}") + with tarfile.open(f"{database_tar_file}") as tar: + tar.extractall(path) + except tarfile.TarError as err: + raise RuntimeError(f"Error extracting the database: {err}") diff --git a/n3fit/src/n3fit/scripts/n3fit_exec.py b/n3fit/src/n3fit/scripts/n3fit_exec.py index 32ddf5260f..8364db1e3f 100755 --- a/n3fit/src/n3fit/scripts/n3fit_exec.py +++ b/n3fit/src/n3fit/scripts/n3fit_exec.py @@ -234,6 +234,17 @@ def produce_hyperscanner(self, parameters, hyperscan_config=None, hyperopt=None) return None if hyperopt and self.environment.restart: hyperscan_config.update({'restart': 'true'}) + if hyperopt and self.environment.parallel_hyperopt: + hyperscan_config.update({'parallel': 'true'}) + hyperscan_config.update( + { + 'db_host': self.environment.db_host, + 'db_port': self.environment.db_port, + 'db_name': self.environment.db_name, + 'output_path': self.environment.output_path.name, + 'num_mongo_workers': self.environment.num_mongo_workers, + } + ) return HyperScanner(parameters, hyperscan_config) @@ -261,6 +272,20 @@ def check_positive(value): parser.add_argument("--hyperopt", help="Enable hyperopt scan", default=None, type=int) parser.add_argument("--restart", help="Enable hyperopt restarts", action="store_true") + parser.add_argument( + "--parallel-hyperopt", + help="Enable hyperopt run in parallel with MongoDB", + action="store_true", + ) + parser.add_argument("--db-host", help="MongoDB host", default="localhost") + parser.add_argument("--db-port", help="MongoDB port", default=27017) + parser.add_argument("--db-name", help="MongoDB dataset name", default="hyperopt-db") + parser.add_argument( + "--num-mongo-workers", + help="Number of mongo workers to be launched simultaneously", + type=check_positive, + default=1, + ) parser.add_argument("replica", help="MC replica number", type=check_positive) parser.add_argument( "-r", @@ -272,6 +297,18 @@ def check_positive(value): def get_commandline_arguments(self, cmdline=None): args = super().get_commandline_arguments(cmdline) + + # Validate dependencies related to the --hyperopt argument + if args["hyperopt"] is None: + if args["restart"]: + raise argparse.ArgumentError( + None, "The --restart option requires --hyperopt to be set." + ) + if args["parallel_hyperopt"]: + raise argparse.ArgumentError( + None, "The --parallel-hyperopt option requires --hyperopt to be set." + ) + if args["output"] is None: args["output"] = pathlib.Path(args["config_yml"]).stem return args @@ -287,6 +324,11 @@ def run(self): self.environment.replicas = NSList(replicas, nskey="replica") self.environment.hyperopt = self.args["hyperopt"] self.environment.restart = self.args["restart"] + self.environment.parallel_hyperopt = self.args["parallel_hyperopt"] + self.environment.db_host = self.args["db_host"] + self.environment.db_port = self.args["db_port"] + self.environment.db_name = self.args["db_name"] + self.environment.num_mongo_workers = self.args["num_mongo_workers"] super().run() except N3FitError as e: log.error(f"Error in n3fit:\n{e}") diff --git a/n3fit/src/n3fit/tests/test_hyperopt.py b/n3fit/src/n3fit/tests/test_hyperopt.py index 6e79dfe7bf..294addb587 100644 --- a/n3fit/src/n3fit/tests/test_hyperopt.py +++ b/n3fit/src/n3fit/tests/test_hyperopt.py @@ -5,6 +5,8 @@ import pathlib import shutil import subprocess as sp +import tarfile +import time import numpy as np from numpy.testing import assert_approx_equal @@ -170,3 +172,144 @@ def test_restart_from_pickle(tmp_path): assert restart_json[i]['tid'] == direct_json[i]['tid'] assert restart_json[i]['misc']['idxs'] == direct_json[i]['misc']['idxs'] # Note that it doesn't check the final loss of the second trial + + +def test_parallel_hyperopt(tmp_path): + """Ensure that the parallel implementation of hyperopt with MongoDB works as expected.""" + # Prepare the run + quickcard = f"hyper-{QUICKNAME}.yml" + quickpath = REGRESSION_FOLDER / quickcard + + # Define number of trials and number of mongo-workers to launch + n_trials = 6 + n_mongo_workers = 3 + + # Set up output directories + output_sequential = tmp_path / "run_hyperopt_sequential" + output_parallel = tmp_path / "run_hyperopt_parallel" + + # cp runcard to tmp folder + shutil.copy(quickpath, tmp_path) + + # Run hyperopt sequentially + start_time = time.time() + sp.run( + f"{EXE} {quickpath} {REPLICA} --hyperopt {n_trials} " f"-o {output_sequential}".split(), + cwd=tmp_path, + check=True, + ) + end_time = time.time() + sequential_run_time = end_time - start_time + + # Run hyperopt in parallel + start_time = time.time() + sp.run( + f"{EXE} {quickpath} {REPLICA} --hyperopt {n_trials} " + f"--parallel-hyperopt --num-mongo-workers {n_mongo_workers} " + f"-o {output_parallel}".split(), + cwd=tmp_path, + check=True, + ) + end_time = time.time() + parallel_run_time = end_time - start_time + + # Read up generated json files + sequential_json_path = f"{output_sequential}/nnfit/replica_{REPLICA}/tries.json" + sequential_json = load_data(sequential_json_path) + parallel_json_path = f"{output_parallel}/nnfit/replica_{REPLICA}/tries.json" + parallel_json = load_data(parallel_json_path) + + # Check that the parallel run time is lower than the sequential one + assert parallel_run_time < sequential_run_time + + # Check that the final json files have the same number of trials + assert len(parallel_json) == len(sequential_json) + + for i in range(n_trials): + # Check that the files share the same content + assert len(parallel_json[i]['misc']) == len(sequential_json[i]['misc']) + assert len(parallel_json[i]['result']) == len(sequential_json[i]['result']) + # Note: cannot check that they share exactly the same history + # as the hyperopt algorithm depends on the results from previous runs + # which is obviously different between parallel and sequential runs + + +def clean_up_database(tmp_path, database_name): + """Stops the MongoDB database.""" + directory_path = f"{tmp_path}/{database_name}" + try: + sp.run(f"rm -r {directory_path}", shell=True, check=True) + except (sp.CalledProcessError, OSError) as err: + msg = f"Error cleaning up database: {err}" + raise EnvironmentError(msg) from err + + +def get_tar_size(filetar): + """Returns the size of a tar file.""" + + def tar_size(tar): + return sum(member.size for member in tar.getmembers()) + + with tarfile.open(filetar, 'r') as tar: + size = tar_size(tar) + return size + + +def test_restart_from_tar(tmp_path): + """Ensure that our parallel hyperopt restart works as expected.""" + # Prepare the run + quickcard = f"hyper-{QUICKNAME}.yml" + quickpath = REGRESSION_FOLDER / quickcard + + # Set up some options + n_mongo_workers = 3 + n_trials_stop = 3 + n_trials_total = 6 + output = tmp_path / "output" + database_name = f"hyperopt-db-{output.name}" + + # cp runcard to tmp folder + shutil.copy(quickpath, tmp_path) + # run some trials for the first time + sp.run( + f"{EXE} {quickpath} {REPLICA} --hyperopt {n_trials_stop} " + f"--parallel-hyperopt --num-mongo-workers {n_mongo_workers} " + f"-o {output}".split(), + cwd=tmp_path, + check=True, + ) + json_path = f"{output}/nnfit/replica_{REPLICA}/tries.json" + tar_name = f"{output}/nnfit/replica_{REPLICA}/{database_name}.tar.gz" + initial_json = load_data(json_path) + initial_tar_size = get_tar_size(tar_name) + + # just in case, remove old database files to ensure that the restart occurs via tar file + clean_up_database(tmp_path, database_name) + + # restart and calculate more trials + sp.run( + f"{EXE} {quickpath} {REPLICA} --hyperopt {n_trials_total} " + f"--parallel-hyperopt --num-mongo-workers {n_mongo_workers} " + f"-o {output} --restart".split(), + cwd=tmp_path, + check=True, + ) + final_json = load_data(json_path) + final_tar_size = get_tar_size(tar_name) + + # check if the calculations went well + assert len(initial_json) == n_trials_stop + assert len(final_json) == n_trials_total + + # check if the tar files were generated correctly + assert tarfile.is_tarfile(tar_name) is True + + # check if the final tar file was updated after restart + assert final_tar_size > initial_tar_size + + for i in range(n_trials_stop): + # check that the json files share exactly the same hyperopt history until the restart + assert initial_json[i]['misc'] == final_json[i]['misc'] + assert initial_json[i]['state'] == final_json[i]['state'] + assert initial_json[i]['tid'] == final_json[i]['tid'] + assert initial_json[i]['result'] == final_json[i]['result'] diff --git a/pyproject.toml b/pyproject.toml index 9d24b2d8c7..b9060119da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,6 +84,8 @@ eko = "^0.14.1" # Hyperopt hyperopt = "*" seaborn = "*" +# Hyperopt parallel +pymongo = "<4" # LHAPDF installation for debugging purposes # a3b2bbc3ced97675ac3a71df45f55ba = "*" # Optional dependencies