diff --git a/.gitignore b/.gitignore index 828a6fff6..c6bd3c0dd 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ dist/ .spyproject/ .hypothesis +.pixi diff --git a/libensemble/gen_classes/__init__.py b/libensemble/gen_classes/__init__.py index d5bfedd34..f33c2ebc0 100644 --- a/libensemble/gen_classes/__init__.py +++ b/libensemble/gen_classes/__init__.py @@ -1,3 +1,2 @@ from .aposmm import APOSMM # noqa: F401 from .sampling import UniformSample, UniformSampleDicts # noqa: F401 -from .surmise import Surmise # noqa: F401 diff --git a/libensemble/gen_classes/aposmm.py b/libensemble/gen_classes/aposmm.py index d3f068577..1cb802173 100644 --- a/libensemble/gen_classes/aposmm.py +++ b/libensemble/gen_classes/aposmm.py @@ -6,7 +6,6 @@ from libensemble.generators import LibensembleGenThreadInterfacer from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP -from libensemble.tools import add_unique_random_streams class APOSMM(LibensembleGenThreadInterfacer): @@ -15,24 +14,36 @@ class APOSMM(LibensembleGenThreadInterfacer): """ def __init__( - self, History: npt.NDArray = [], persis_info: dict = {}, gen_specs: dict = {}, libE_info: dict = {}, **kwargs + self, + variables: dict, + objectives: dict, + History: npt.NDArray = [], + persis_info: dict = {}, + gen_specs: dict = {}, + libE_info: dict = {}, + **kwargs ) -> None: from libensemble.gen_funcs.persistent_aposmm import aposmm + self.variables = variables + self.objectives = objectives + gen_specs["gen_f"] = aposmm + if not gen_specs.get("out"): # gen_specs never especially changes for aposmm even as the problem varies - n = len(kwargs["lb"]) or len(kwargs["ub"]) + if not self.variables: + self.n = len(kwargs["lb"]) or len(kwargs["ub"]) + else: + self.n = len(self.variables) gen_specs["out"] = [ - ("x", float, n), - ("x_on_cube", float, n), + ("x", float, self.n), + ("x_on_cube", float, self.n), ("sim_id", int), ("local_min", bool), ("local_pt", bool), ] gen_specs["persis_in"] = ["x", "f", "local_pt", "sim_id", "sim_ended", "x_on_cube", "local_min"] - if not persis_info: - persis_info = add_unique_random_streams({}, 2, seed=4321)[1] - super().__init__(History, persis_info, gen_specs, libE_info, **kwargs) + super().__init__(variables, objectives, History, persis_info, gen_specs, libE_info, **kwargs) if not self.persis_info.get("nworkers"): self.persis_info["nworkers"] = kwargs.get("nworkers", gen_specs["user"]["max_active_runs"]) self.all_local_minima = [] diff --git a/libensemble/gen_classes/sampling.py b/libensemble/gen_classes/sampling.py index 166286482..35a075e22 100644 --- a/libensemble/gen_classes/sampling.py +++ b/libensemble/gen_classes/sampling.py @@ -3,6 +3,7 @@ import numpy as np from libensemble.generators import Generator, LibensembleGenerator +from libensemble.utils.misc import list_dicts_to_np __all__ = [ "UniformSample", @@ -31,14 +32,16 @@ class UniformSample(SampleBase): mode by adjusting the allocation function. """ - def __init__(self, _=[], persis_info={}, gen_specs={}, libE_info=None, **kwargs): - super().__init__(_, persis_info, gen_specs, libE_info, **kwargs) + def __init__(self, variables: dict, objectives: dict, _=[], persis_info={}, gen_specs={}, libE_info=None, **kwargs): + super().__init__(variables, objectives, _, persis_info, gen_specs, libE_info, **kwargs) self._get_user_params(self.gen_specs["user"]) def ask_numpy(self, n_trials): - H_o = np.zeros(n_trials, dtype=self.gen_specs["out"]) - H_o["x"] = self.persis_info["rand_stream"].uniform(self.lb, self.ub, (n_trials, self.n)) - return H_o + return list_dicts_to_np( + UniformSampleDicts( + self.variables, self.objectives, self.History, self.persis_info, self.gen_specs, self.libE_info + ).ask(n_trials) + ) def tell_numpy(self, calc_in): pass # random sample so nothing to tell @@ -53,31 +56,23 @@ class UniformSampleDicts(Generator): sampled points the first time it is called. Afterwards, it returns the number of points given. This can be used in either a batch or asynchronous mode by adjusting the allocation function. + + This currently adheres to the complete standard. """ - def __init__(self, _, persis_info, gen_specs, libE_info=None, **kwargs): + def __init__(self, variables: dict, objectives: dict, _, persis_info, gen_specs, libE_info=None, **kwargs): + self.variables = variables self.gen_specs = gen_specs self.persis_info = persis_info - self._get_user_params(self.gen_specs["user"]) def ask(self, n_trials): H_o = [] for _ in range(n_trials): - # using same rand number stream - trial = {"x": self.persis_info["rand_stream"].uniform(self.lb, self.ub, self.n)} + trial = {} + for key in self.variables.keys(): + trial[key] = self.persis_info["rand_stream"].uniform(self.variables[key][0], self.variables[key][1]) H_o.append(trial) return H_o def tell(self, calc_in): pass # random sample so nothing to tell - - # Duplicated for now - def _get_user_params(self, user_specs): - """Extract user params""" - # b = user_specs["initial_batch_size"] - self.ub = user_specs["ub"] - self.lb = user_specs["lb"] - self.n = len(self.lb) # dimension - assert isinstance(self.n, int), "Dimension must be an integer" - assert isinstance(self.lb, np.ndarray), "lb must be a numpy array" - assert isinstance(self.ub, np.ndarray), "ub must be a numpy array" diff --git a/libensemble/gen_classes/surmise.py b/libensemble/gen_classes/surmise.py deleted file mode 100644 index b62cd20dc..000000000 --- a/libensemble/gen_classes/surmise.py +++ /dev/null @@ -1,60 +0,0 @@ -import copy -import queue as thread_queue -from typing import List - -import numpy as np -from numpy import typing as npt - -from libensemble.generators import LibensembleGenThreadInterfacer - - -class Surmise(LibensembleGenThreadInterfacer): - """ - Standalone object-oriented Surmise generator - """ - - def __init__( - self, History: npt.NDArray = [], persis_info: dict = {}, gen_specs: dict = {}, libE_info: dict = {} - ) -> None: - from libensemble.gen_funcs.persistent_surmise_calib import surmise_calib - - gen_specs["gen_f"] = surmise_calib - if ("sim_id", int) not in gen_specs["out"]: - gen_specs["out"].append(("sim_id", int)) - super().__init__(History, persis_info, gen_specs, libE_info) - self.sim_id_index = 0 - self.all_cancels = [] - - def _add_sim_ids(self, array: npt.NDArray) -> npt.NDArray: - array["sim_id"] = np.arange(self.sim_id_index, self.sim_id_index + len(array)) - self.sim_id_index += len(array) - return array - - def ready_to_be_asked(self) -> bool: - """Check if the generator has the next batch of points ready.""" - return not self.outbox.empty() - - def ask_numpy(self, *args) -> npt.NDArray: - """Request the next set of points to evaluate, as a NumPy array.""" - output = super().ask_numpy() - if "cancel_requested" in output.dtype.names: - cancels = output - got_cancels_first = True - self.all_cancels.append(cancels) - else: - self.results = self._add_sim_ids(output) - got_cancels_first = False - try: - _, additional = self.outbox.get(timeout=0.2) # either cancels or new points - if got_cancels_first: - return additional["calc_out"] - self.all_cancels.append(additional["calc_out"]) - return self.results - except thread_queue.Empty: - return self.results - - def ask_updates(self) -> List[npt.NDArray]: - """Request a list of NumPy arrays containing points that should be cancelled by the workflow.""" - cancels = copy.deepcopy(self.all_cancels) - self.all_cancels = [] - return cancels diff --git a/libensemble/generators.py b/libensemble/generators.py index cae1f109e..d8cb06cb8 100644 --- a/libensemble/generators.py +++ b/libensemble/generators.py @@ -22,6 +22,10 @@ """ +class GeneratorNotStartedException(Exception): + """Exception raised by a threaded/multiprocessed generator upon being asked without having been started""" + + class Generator(ABC): """ @@ -32,9 +36,9 @@ class Generator(ABC): class MyGenerator(Generator): - def __init__(self, param): + def __init__(self, variables, objectives, param): self.param = param - self.model = None + self.model = create_model(variables, objectives, self.param) def ask(self, num_points): return create_points(num_points, self.param) @@ -47,12 +51,15 @@ def final_tell(self, results): return list(self.model) - my_generator = MyGenerator(my_parameter=100) + variables = {"a": [-1, 1], "b": [-2, 2]} + objectives = {"f": "MINIMIZE"} + + my_generator = MyGenerator(variables, objectives, my_parameter=100) gen_specs = GenSpecs(generator=my_generator, ...) """ @abstractmethod - def __init__(self, *args, **kwargs): + def __init__(self, variables: dict[str, List[float]], objectives: dict[str, str], *args, **kwargs): """ Initialize the Generator object on the user-side. Constants, class-attributes, and preparation goes here. @@ -94,12 +101,45 @@ class LibensembleGenerator(Generator): """ def __init__( - self, History: npt.NDArray = [], persis_info: dict = {}, gen_specs: dict = {}, libE_info: dict = {}, **kwargs + self, + variables: dict, + objectives: dict = {}, + History: npt.NDArray = [], + persis_info: dict = {}, + gen_specs: dict = {}, + libE_info: dict = {}, + **kwargs, ): + self.variables = variables + self.objectives = objectives + self.History = History self.gen_specs = gen_specs + self.libE_info = libE_info + + self.variables_mapping = kwargs.get("variables_mapping", {}) + + self._internal_variable = "x" # need to figure these out dynamically + self._internal_objective = "f" + + if self.variables: + + self.n = len(self.variables) + # build our own lb and ub + if "lb" not in kwargs and "ub" not in kwargs: + lb = [] + ub = [] + for i, v in enumerate(self.variables.values()): + if isinstance(v, list) and (isinstance(v[0], int) or isinstance(v[0], float)): + lb.append(v[0]) + ub.append(v[1]) + kwargs["lb"] = np.array(lb) + kwargs["ub"] = np.array(ub) + if len(kwargs) > 0: # so user can specify gen-specific parameters as kwargs to constructor - self.gen_specs["user"] = kwargs - if not persis_info: + if not self.gen_specs.get("user"): + self.gen_specs["user"] = {} + self.gen_specs["user"].update(kwargs) + if not persis_info.get("rand_stream"): self.persis_info = add_unique_random_streams({}, 4, seed=4321)[1] else: self.persis_info = persis_info @@ -121,13 +161,13 @@ def convert_np_types(dict_list): def ask(self, num_points: Optional[int] = 0) -> List[dict]: """Request the next set of points to evaluate.""" - return LibensembleGenerator.convert_np_types(np_to_list_dicts(self.ask_numpy(num_points))) + return LibensembleGenerator.convert_np_types( + np_to_list_dicts(self.ask_numpy(num_points), mapping=self.variables_mapping) + ) def tell(self, results: List[dict]) -> None: """Send the results of evaluations to the generator.""" - self.tell_numpy(list_dicts_to_np(results)) - # Note that although we'd prefer to have a complete dtype available, the gen - # doesn't have access to sim_specs["out"] currently. + self.tell_numpy(list_dicts_to_np(results, mapping=self.variables_mapping)) class LibensembleGenThreadInterfacer(LibensembleGenerator): @@ -136,36 +176,30 @@ class LibensembleGenThreadInterfacer(LibensembleGenerator): """ def __init__( - self, History: npt.NDArray = [], persis_info: dict = {}, gen_specs: dict = {}, libE_info: dict = {}, **kwargs + self, + variables: dict, + objectives: dict = {}, + History: npt.NDArray = [], + persis_info: dict = {}, + gen_specs: dict = {}, + libE_info: dict = {}, + **kwargs, ) -> None: - super().__init__(History, persis_info, gen_specs, libE_info, **kwargs) + super().__init__(variables, objectives, History, persis_info, gen_specs, libE_info, **kwargs) self.gen_f = gen_specs["gen_f"] self.History = History - self.persis_info = persis_info self.libE_info = libE_info self.thread = None def setup(self) -> None: """Must be called once before calling ask/tell. Initializes the background thread.""" - # self.inbox = thread_queue.Queue() # sending betweween HERE and gen - # self.outbox = thread_queue.Queue() - + if self.thread is not None: + return # SH this contains the thread lock - removing.... wrong comm to pass on anyway. if hasattr(Executor.executor, "comm"): del Executor.executor.comm self.libE_info["executor"] = Executor.executor - # SH - fix comment (thread and process & name object appropriately - task? qcomm?) - # self.thread = QCommThread( # TRY A PROCESS - # self.gen_f, - # None, - # self.History, - # self.persis_info, - # self.gen_specs, - # self.libE_info, - # user_function=True, - # ) # note that self.thread's inbox/outbox are unused by the underlying gen - self.thread = QCommProcess( # TRY A PROCESS self.gen_f, None, @@ -191,7 +225,7 @@ def _set_sim_ended(self, results: npt.NDArray) -> npt.NDArray: def tell(self, results: List[dict], tag: int = EVAL_GEN_TAG) -> None: """Send the results of evaluations to the generator.""" - self.tell_numpy(list_dicts_to_np(results), tag) + self.tell_numpy(list_dicts_to_np(results, mapping=self.variables_mapping), tag) def ask_numpy(self, num_points: int = 0) -> npt.NDArray: """Request the next set of points to evaluate, as a NumPy array.""" diff --git a/libensemble/tests/functionality_tests/test_asktell_sampling.py b/libensemble/tests/functionality_tests/test_asktell_sampling.py index 57db0f5e4..506118d5c 100644 --- a/libensemble/tests/functionality_tests/test_asktell_sampling.py +++ b/libensemble/tests/functionality_tests/test_asktell_sampling.py @@ -17,8 +17,7 @@ # Import libEnsemble items for this test from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f -from libensemble.gen_classes.sampling import UniformSample, UniformSampleDicts -from libensemble.gen_funcs.persistent_gen_wrapper import persistent_gen_f as gen_f +from libensemble.gen_classes.sampling import UniformSample from libensemble.libE import libE from libensemble.tools import add_unique_random_streams, parse_args @@ -51,39 +50,21 @@ def sim_f(In): }, } + variables = {"x0": [-3, 3], "x1": [-2, 2]} + + objectives = {"f": "EXPLORE"} + alloc_specs = {"alloc_f": alloc_f} exit_criteria = {"gen_max": 201} - for inst in range(4): - persis_info = add_unique_random_streams({}, nworkers + 1, seed=1234) - - if inst == 0: - # Using wrapper - pass class - generator = UniformSample - gen_specs["gen_f"] = gen_f - gen_specs["user"]["generator"] = generator - - if inst == 1: - # Using wrapper - pass object - gen_specs["gen_f"] = gen_f - generator = UniformSample(None, persis_info[1], gen_specs, None) - gen_specs["user"]["generator"] = generator - elif inst == 2: - # Using asktell runner - pass object - gen_specs.pop("gen_f", None) - generator = UniformSample(None, persis_info[1], gen_specs, None) - gen_specs["generator"] = generator - elif inst == 3: - # Using asktell runner - pass object - with standardized interface. - gen_specs.pop("gen_f", None) - generator = UniformSampleDicts(None, persis_info[1], gen_specs, None) - gen_specs["generator"] = generator - - H, persis_info, flag = libE( - sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs=libE_specs - ) - - if is_manager: - print(H[["sim_id", "x", "f"]][:10]) - assert len(H) >= 201, f"H has length {len(H)}" - assert np.isclose(H["f"][9], 1.96760289) + persis_info = add_unique_random_streams({}, nworkers + 1, seed=1234) + + # Using asktell runner - pass object + generator = UniformSample(variables, objectives) + gen_specs["generator"] = generator + + H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs=libE_specs) + + if is_manager: + print(H[["sim_id", "x", "f"]][:10]) + assert len(H) >= 201, f"H has length {len(H)}" diff --git a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py index 101759966..25fbc6afb 100644 --- a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py +++ b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py @@ -33,7 +33,6 @@ from libensemble.gen_classes import APOSMM from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, SimSpecs from libensemble.tests.regression_tests.support import six_hump_camel_minima as minima -from libensemble.tools import save_libE_output # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": @@ -52,6 +51,8 @@ workflow.exit_criteria = ExitCriteria(sim_max=2000) aposmm = APOSMM( + variables={"x0": [-3, 3], "x1": [-2, 2]}, # we hope to combine these + objectives={"f": "MINIMIZE"}, initial_sample_size=100, sample_points=minima, localopt_method="LN_BOBYQA", @@ -59,20 +60,11 @@ xtol_abs=1e-6, ftol_abs=1e-6, max_active_runs=workflow.nworkers, # should this match nworkers always? practically? - lb=np.array([-3, -2]), - ub=np.array([3, 2]), + variables_mapping={"x": ["x0", "x1"]}, ) workflow.gen_specs = GenSpecs( persis_in=["x", "x_on_cube", "sim_id", "local_min", "local_pt", "f"], - outputs=[ - ("x", float, n), - ("x_on_cube", float, n), - ("sim_id", int), - ("local_min", bool), - ("local_pt", bool), - ("f", float), - ], generator=aposmm, batch_size=5, initial_batch_size=10, @@ -82,7 +74,7 @@ workflow.libE_specs.gen_on_manager = True workflow.add_random_streams() - H, persis_info, _ = workflow.run() + H, _, _ = workflow.run() # Perform the run @@ -96,6 +88,3 @@ # We use their values to test APOSMM has identified all minima print(np.min(np.sum((H[H["local_min"]]["x"] - m) ** 2, 1)), flush=True) assert np.min(np.sum((H[H["local_min"]]["x"] - m) ** 2, 1)) < tol - - persis_info[0]["comm"] = None - save_libE_output(H, persis_info, __file__, workflow.nworkers) diff --git a/libensemble/tests/regression_tests/test_asktell_surmise.py b/libensemble/tests/regression_tests/test_asktell_surmise.py deleted file mode 100644 index 1afad75c3..000000000 --- a/libensemble/tests/regression_tests/test_asktell_surmise.py +++ /dev/null @@ -1,136 +0,0 @@ -# TESTSUITE_COMMS: local -# TESTSUITE_NPROCS: 4 -# TESTSUITE_EXTRA: true -# TESTSUITE_OS_SKIP: OSX - -import os - -import numpy as np - -from libensemble.message_numbers import FINISHED_PERSISTENT_GEN_TAG - -if __name__ == "__main__": - - from libensemble.executors import Executor - from libensemble.gen_classes import Surmise - - # Import libEnsemble items for this test - from libensemble.sim_funcs.borehole_kills import borehole - from libensemble.tests.regression_tests.common import build_borehole # current location - from libensemble.tools import add_unique_random_streams - from libensemble.utils.misc import list_dicts_to_np - - sim_app = os.path.join(os.getcwd(), "borehole.x") - if not os.path.isfile(sim_app): - build_borehole() - - exctr = Executor() # Run serial sub-process in place - exctr.register_app(full_path=sim_app, app_name="borehole") - - n_init_thetas = 15 # Initial batch of thetas - n_x = 5 # No. of x values - nparams = 4 # No. of theta params - ndims = 3 # No. of x coordinates. - max_add_thetas = 20 # Max no. of thetas added for evaluation - step_add_theta = 10 # No. of thetas to generate per step, before emulator is rebuilt - n_explore_theta = 200 # No. of thetas to explore while selecting the next theta - obsvar = 10 ** (-1) # Constant for generating noise in obs - - # Batch mode until after init_sample_size (add one theta to batch for observations) - init_sample_size = (n_init_thetas + 1) * n_x - - # Stop after max_emul_runs runs of the emulator - max_evals = init_sample_size + max_add_thetas * n_x - - # Rename ensemble dir for non-interference with other regression tests - sim_specs = { - "in": ["x", "thetas"], - "out": [ - ("f", float), - ("sim_killed", bool), - ], - "user": { - "num_obs": n_x, - "init_sample_size": init_sample_size, - "poll_manager": False, - }, - } - - gen_out = [ - ("x", float, ndims), - ("thetas", float, nparams), - ("priority", int), - ("obs", float, n_x), - ("obsvar", float, n_x), - ] - - gen_specs = { - "persis_in": [o[0] for o in gen_out] + ["f", "sim_ended", "sim_id"], - "out": gen_out, - "user": { - "n_init_thetas": n_init_thetas, # Num thetas in initial batch - "num_x_vals": n_x, # Num x points to create - "step_add_theta": step_add_theta, # No. of thetas to generate per step - "n_explore_theta": n_explore_theta, # No. of thetas to explore each step - "obsvar": obsvar, # Variance for generating noise in obs - "init_sample_size": init_sample_size, # Initial batch size inc. observations - "priorloc": 1, # Prior location in the unit cube. - "priorscale": 0.2, # Standard deviation of prior - }, - } - - persis_info = add_unique_random_streams({}, 5) - surmise = Surmise(gen_specs=gen_specs, persis_info=persis_info[1]) # we add sim_id as a field to gen_specs["out"] - surmise.setup() - - initial_sample = surmise.ask() - - total_evals = 0 - - for point in initial_sample: - H_out, _a, _b = borehole( - list_dicts_to_np([point], dtype=gen_specs["out"]), {}, sim_specs, {"H_rows": np.array([point["sim_id"]])} - ) - point["f"] = H_out["f"][0] # some "bugginess" with output shape of array in simf - total_evals += 1 - - surmise.tell(initial_sample) - - requested_canceled_sim_ids = [] - - next_sample, cancels = surmise.ask(), surmise.ask_updates() - - for point in next_sample: - H_out, _a, _b = borehole( - list_dicts_to_np([point], dtype=gen_specs["out"]), {}, sim_specs, {"H_rows": np.array([point["sim_id"]])} - ) - point["f"] = H_out["f"][0] - total_evals += 1 - - surmise.tell(next_sample) - sample, cancels = surmise.ask(), surmise.ask_updates() - - while total_evals < max_evals: - - for point in sample: - H_out, _a, _b = borehole( - list_dicts_to_np([point], dtype=gen_specs["out"]), - {}, - sim_specs, - {"H_rows": np.array([point["sim_id"]])}, - ) - point["f"] = H_out["f"][0] - total_evals += 1 - surmise.tell([point]) - if surmise.ready_to_be_asked(): - new_sample, cancels = surmise.ask(), surmise.ask_updates() - for m in cancels: - requested_canceled_sim_ids.append(m) - if len(new_sample): - sample = new_sample - break - - H, persis_info, exit_code = surmise.final_tell(None) - - assert exit_code == FINISHED_PERSISTENT_GEN_TAG, "Standalone persistent_aposmm didn't exit correctly" - # assert len(requested_canceled_sim_ids), "No cancellations sent by Surmise" diff --git a/libensemble/tests/regression_tests/test_asktell_surmise_killsims.py b/libensemble/tests/regression_tests/test_asktell_surmise_killsims.py deleted file mode 100644 index 9071e80d4..000000000 --- a/libensemble/tests/regression_tests/test_asktell_surmise_killsims.py +++ /dev/null @@ -1,144 +0,0 @@ -""" -Tests libEnsemble's capability to kill/cancel simulations that are in progress. - -Execute via one of the following commands (e.g. 3 workers): - mpiexec -np 4 python test_persistent_surmise_killsims.py - python test_persistent_surmise_killsims.py --nworkers 3 --comms local - python test_persistent_surmise_killsims.py --nworkers 3 --comms tcp - -When running with the above commands, the number of concurrent evaluations of -the objective function will be 2, as one of the three workers will be the -persistent generator. - -This test is a smaller variant of test_persistent_surmise_calib.py, but which -subprocesses a compiled version of the borehole simulation. A delay is -added to simulations after the initial batch, so that the killing of running -simulations can be tested. This will only affect simulations that have already -been issued to a worker when the cancel request is registesred by the manager. - -See more information, see tutorial: -"Borehole Calibration with Selective Simulation Cancellation" -in the libEnsemble documentation. -""" - -# Do not change these lines - they are parsed by run-tests.sh -# TESTSUITE_COMMS: mpi local -# TESTSUITE_NPROCS: 3 4 -# TESTSUITE_EXTRA: true -# TESTSUITE_OS_SKIP: OSX - -# Requires: -# Install Surmise package - -import os - -import numpy as np - -from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f -from libensemble.executors.executor import Executor -from libensemble.gen_classes import Surmise - -# Import libEnsemble items for this test -from libensemble.libE import libE -from libensemble.sim_funcs.borehole_kills import borehole as sim_f -from libensemble.tests.regression_tests.common import build_borehole # current location -from libensemble.tools import add_unique_random_streams, parse_args, save_libE_output - -# from libensemble import logger -# logger.set_level("DEBUG") # To get debug logging in ensemble.log - -if __name__ == "__main__": - nworkers, is_manager, libE_specs, _ = parse_args() - - n_init_thetas = 15 # Initial batch of thetas - n_x = 5 # No. of x values - nparams = 4 # No. of theta params - ndims = 3 # No. of x coordinates. - max_add_thetas = 20 # Max no. of thetas added for evaluation - step_add_theta = 10 # No. of thetas to generate per step, before emulator is rebuilt - n_explore_theta = 200 # No. of thetas to explore while selecting the next theta - obsvar = 10 ** (-1) # Constant for generating noise in obs - - # Batch mode until after init_sample_size (add one theta to batch for observations) - init_sample_size = (n_init_thetas + 1) * n_x - - # Stop after max_emul_runs runs of the emulator - max_evals = init_sample_size + max_add_thetas * n_x - - sim_app = os.path.join(os.getcwd(), "borehole.x") - if not os.path.isfile(sim_app): - build_borehole() - - exctr = Executor() # Run serial sub-process in place - exctr.register_app(full_path=sim_app, app_name="borehole") - - # Subprocess variant creates input and output files for each sim - libE_specs["sim_dirs_make"] = True # To keep all - make sim dirs - # libE_specs["use_worker_dirs"] = True # To overwrite - make worker dirs only - - # Rename ensemble dir for non-interference with other regression tests - libE_specs["ensemble_dir_path"] = "ensemble_calib_kills_asktell" - libE_specs["gen_on_manager"] = True - - sim_specs = { - "sim_f": sim_f, - "in": ["x", "thetas"], - "out": [ - ("f", float), - ("sim_killed", bool), # "sim_killed" is used only for display at the end of this test - ], - "user": { - "num_obs": n_x, - "init_sample_size": init_sample_size, - }, - } - - gen_out = [ - ("x", float, ndims), - ("thetas", float, nparams), - ("priority", int), - ("obs", float, n_x), - ("obsvar", float, n_x), - ] - - gen_specs = { - "persis_in": [o[0] for o in gen_out] + ["f", "sim_ended", "sim_id"], - "out": gen_out, - "user": { - "n_init_thetas": n_init_thetas, # Num thetas in initial batch - "num_x_vals": n_x, # Num x points to create - "step_add_theta": step_add_theta, # No. of thetas to generate per step - "n_explore_theta": n_explore_theta, # No. of thetas to explore each step - "obsvar": obsvar, # Variance for generating noise in obs - "init_sample_size": init_sample_size, # Initial batch size inc. observations - "priorloc": 1, # Prior location in the unit cube. - "priorscale": 0.2, # Standard deviation of prior - }, - } - - alloc_specs = { - "alloc_f": alloc_f, - "user": { - "init_sample_size": init_sample_size, - "async_return": True, # True = Return results to gen as they come in (after sample) - "active_recv_gen": True, # Persistent gen can handle irregular communications - }, - } - - persis_info = add_unique_random_streams({}, nworkers + 1) - gen_specs["generator"] = Surmise(gen_specs=gen_specs, persis_info=persis_info) - - exit_criteria = {"sim_max": max_evals} - - # Perform the run - H, persis_info, flag = libE( - sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs=alloc_specs, libE_specs=libE_specs - ) - - if is_manager: - print("Cancelled sims", H["sim_id"][H["cancel_requested"]]) - print("Kills sent by manager to running simulations", H["sim_id"][H["kill_sent"]]) - print("Killed sims", H["sim_id"][H["sim_killed"]]) - sims_done = np.count_nonzero(H["sim_ended"]) - save_libE_output(H, persis_info, __file__, nworkers) - assert sims_done == max_evals, f"Num of completed simulations should be {max_evals}. Is {sims_done}" diff --git a/libensemble/tests/unit_tests/test_asktell.py b/libensemble/tests/unit_tests/test_asktell.py index fd80b8829..1364b7031 100644 --- a/libensemble/tests/unit_tests/test_asktell.py +++ b/libensemble/tests/unit_tests/test_asktell.py @@ -1,10 +1,9 @@ import numpy as np -from libensemble.tools.tools import add_unique_random_streams from libensemble.utils.misc import list_dicts_to_np -def _check_conversion(H, npp): +def _check_conversion(H, npp, mapping={}): for field in H.dtype.names: print(f"Comparing {field}: {H[field]} {npp[field]}") @@ -25,25 +24,16 @@ def _check_conversion(H, npp): def test_asktell_sampling_and_utils(): from libensemble.gen_classes.sampling import UniformSample - persis_info = add_unique_random_streams({}, 5, seed=1234) - gen_specs = { - "out": [("x", float, (2,))], - "user": { - "lb": np.array([-3, -2]), - "ub": np.array([3, 2]), - }, - } + variables = {"x0": [-3, 3], "x1": [-2, 2]} + objectives = {"f": "EXPLORE"} # Test initialization with libensembley parameters - gen = UniformSample(None, persis_info[1], gen_specs, None) - assert len(gen.ask(10)) == 10 - - # Test initialization gen-specific keyword args - gen = UniformSample(gen_specs=gen_specs, lb=np.array([-3, -2]), ub=np.array([3, 2])) + gen = UniformSample(variables, objectives) assert len(gen.ask(10)) == 10 out_np = gen.ask_numpy(3) # should get numpy arrays, non-flattened out = gen.ask(3) # needs to get dicts, 2d+ arrays need to be flattened + assert all([len(x) == 2 for x in out]) # np_to_list_dicts is now tested # now we test list_dicts_to_np directly @@ -55,6 +45,19 @@ def test_asktell_sampling_and_utils(): for j, value in enumerate(entry.values()): assert value == out_np["x"][i][j] + variables = {"core": [-3, 3], "edge": [-2, 2]} + objectives = {"energy": "EXPLORE"} + mapping = {"x": ["core", "edge"]} + + gen = UniformSample(variables, objectives, mapping) + out = gen.ask(1) + assert len(out) == 1 + assert out[0].get("core") + assert out[0].get("edge") + + out_np = list_dicts_to_np(out, mapping=mapping) + assert out_np.dtype.names[0] == "x" + def test_awkward_list_dict(): from libensemble.utils.misc import list_dicts_to_np @@ -87,6 +90,34 @@ def test_awkward_list_dict(): assert all([i in ("x", "y", "z", "a0") for i in out_np.dtype.names]) + weird_list_dict = [ + { + "sim_id": 77, + "core": 89, + "edge": 10.1, + "beam": 76.5, + "energy": 12.34, + "local_pt": True, + "local_min": False, + }, + { + "sim_id": 10, + "core": 32.8, + "edge": 16.2, + "beam": 33.5, + "energy": 99.34, + "local_pt": False, + "local_min": False, + }, + ] + + # target dtype: [("sim_id", int), ("x, float, (3,)), ("f", float), ("local_pt", bool), ("local_min", bool)] + + mapping = {"x": ["core", "edge", "beam"], "f": ["energy"]} + out_np = list_dicts_to_np(weird_list_dict, mapping=mapping) + + assert all([i in ("sim_id", "x", "f", "local_pt", "local_min") for i in out_np.dtype.names]) + def test_awkward_H(): from libensemble.utils.misc import list_dicts_to_np, np_to_list_dicts diff --git a/libensemble/tests/unit_tests/RENAME_test_persistent_aposmm.py b/libensemble/tests/unit_tests/test_persistent_aposmm.py similarity index 92% rename from libensemble/tests/unit_tests/RENAME_test_persistent_aposmm.py rename to libensemble/tests/unit_tests/test_persistent_aposmm.py index f1959e789..25ecdfd46 100644 --- a/libensemble/tests/unit_tests/RENAME_test_persistent_aposmm.py +++ b/libensemble/tests/unit_tests/test_persistent_aposmm.py @@ -14,7 +14,6 @@ import libensemble.tests.unit_tests.setup as setup from libensemble.sim_funcs.six_hump_camel import six_hump_camel_func, six_hump_camel_grad -from libensemble.utils.misc import list_dicts_to_np libE_info = {"comm": {}} @@ -184,11 +183,7 @@ def test_asktell_with_persistent_aposmm(): n = 2 eval_max = 2000 - gen_out = [("x", float, n), ("x_on_cube", float, n), ("sim_id", int), ("local_min", bool), ("local_pt", bool)] - gen_specs = { - "in": ["x", "f", "local_pt", "sim_id", "sim_ended", "x_on_cube", "local_min"], - "out": gen_out, "user": { "initial_sample_size": 100, "sample_points": np.round(minima, 1), @@ -198,19 +193,24 @@ def test_asktell_with_persistent_aposmm(): "ftol_abs": 1e-6, "dist_to_bound_multiple": 0.5, "max_active_runs": 6, - "lb": np.array([-3, -2]), - "ub": np.array([3, 2]), }, } - my_APOSMM = APOSMM(gen_specs=gen_specs) + variables = {"core": [-3, 3], "edge": [-2, 2]} + objectives = {"energy": "MINIMIZE"} + variables_mapping = {"x": ["core", "edge"], "f": ["energy"]} + + my_APOSMM = APOSMM( + variables=variables, objectives=objectives, gen_specs=gen_specs, variables_mapping=variables_mapping + ) + initial_sample = my_APOSMM.ask(100) total_evals = 0 eval_max = 2000 for point in initial_sample: - point["f"] = six_hump_camel_func(np.array([point["x0"], point["x1"]])) + point["energy"] = six_hump_camel_func(np.array([point["core"], point["edge"]])) total_evals += 1 my_APOSMM.tell(initial_sample) @@ -224,10 +224,10 @@ def test_asktell_with_persistent_aposmm(): for m in detected_minima: potential_minima.append(m) for point in sample: - point["f"] = six_hump_camel_func(np.array([point["x0"], point["x1"]])) + point["energy"] = six_hump_camel_func(np.array([point["core"], point["edge"]])) total_evals += 1 my_APOSMM.tell(sample) - H, persis_info, exit_code = my_APOSMM.final_tell(list_dicts_to_np(sample)) # final_tell currently requires numpy + H, persis_info, exit_code = my_APOSMM.final_tell() assert exit_code == FINISHED_PERSISTENT_GEN_TAG, "Standalone persistent_aposmm didn't exit correctly" assert persis_info.get("run_order"), "Standalone persistent_aposmm didn't do any localopt runs" diff --git a/libensemble/utils/misc.py b/libensemble/utils/misc.py index 87786b832..7cc9c1a2a 100644 --- a/libensemble/utils/misc.py +++ b/libensemble/utils/misc.py @@ -2,7 +2,7 @@ Misc internal functions """ -from itertools import groupby +from itertools import chain, groupby from operator import itemgetter from typing import List @@ -108,7 +108,7 @@ def _combine_names(names: list) -> list: return list(set(out_names)) -def list_dicts_to_np(list_dicts: list, dtype: list = None) -> npt.NDArray: +def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) -> npt.NDArray: if list_dicts is None: return None @@ -119,40 +119,58 @@ def list_dicts_to_np(list_dicts: list, dtype: list = None) -> npt.NDArray: if "_id" in entry: entry["sim_id"] = entry.pop("_id") + if dtype is None: + dtype = [] + + # build a presumptive dtype + first = list_dicts[0] # for determining dtype of output np array new_dtype_names = _combine_names([i for i in first.keys()]) # -> ['x', 'y'] + fields_to_convert = list(chain.from_iterable(list(mapping.values()))) + new_dtype_names = [i for i in new_dtype_names if i not in fields_to_convert] + list(mapping.keys()) combinable_names = [] # [['x0', 'x1'], ['y0', 'y1', 'y2'], ['z']] - for name in new_dtype_names: # is this a necessary search over the keys again? we did it earlier... + for name in new_dtype_names: combinable_group = [i for i in first.keys() if i.rstrip("0123456789") == name] if len(combinable_group) > 1: # multiple similar names, e.g. x0, x1 combinable_names.append(combinable_group) else: # single name, e.g. local_pt, a0 *AS LONG AS THERE ISNT AN A1* combinable_names.append([name]) - if dtype is None: - dtype = [] - + # build dtype of non-mapped fields if not len(dtype): - # another loop over names, there's probably a more elegant way, but my brain is fried for i, entry in enumerate(combinable_names): name = new_dtype_names[i] size = len(combinable_names[i]) - dtype.append(_decide_dtype(name, first[entry[0]], size)) + if name not in mapping: + dtype.append(_decide_dtype(name, first[entry[0]], size)) + + # append dtype of mapped float fields + if len(mapping): + for name in mapping: + size = len(mapping[name]) + dtype.append(_decide_dtype(name, 0.0, size)) # float out = np.zeros(len(list_dicts), dtype=dtype) - for i, group in enumerate(combinable_names): - new_dtype_name = new_dtype_names[i] - for j, input_dict in enumerate(list_dicts): - if len(group) == 1: # only a single name, e.g. local_pt - out[new_dtype_name][j] = input_dict[new_dtype_name] - else: # combinable names detected, e.g. x0, x1 - out[new_dtype_name][j] = tuple([input_dict[name] for name in group]) + for j, input_dict in enumerate(list_dicts): + for output_name, field_names in zip(new_dtype_names, combinable_names): + if output_name not in mapping: + out[output_name][j] = ( + tuple(input_dict[name] for name in field_names) + if len(field_names) > 1 + else input_dict[field_names[0]] + ) + else: + out[output_name][j] = ( + tuple(input_dict[name] for name in mapping[output_name]) + if len(mapping[output_name]) > 1 + else input_dict[mapping[output_name][0]] + ) return out -def np_to_list_dicts(array: npt.NDArray) -> List[dict]: +def np_to_list_dicts(array: npt.NDArray, mapping: dict = {}) -> List[dict]: if array is None: return None out = [] @@ -160,13 +178,18 @@ def np_to_list_dicts(array: npt.NDArray) -> List[dict]: new_dict = {} for field in row.dtype.names: # non-string arrays, lists, etc. - if hasattr(row[field], "__len__") and len(row[field]) > 1 and not isinstance(row[field], str): - for i, x in enumerate(row[field]): - new_dict[field + str(i)] = x - elif hasattr(row[field], "__len__") and len(row[field]) == 1: # single-entry arrays, lists, etc. - new_dict[field] = row[field][0] # will still work on single-char strings + if field not in list(mapping.keys()): + if hasattr(row[field], "__len__") and len(row[field]) > 1 and not isinstance(row[field], str): + for i, x in enumerate(row[field]): + new_dict[field + str(i)] = x + elif hasattr(row[field], "__len__") and len(row[field]) == 1: # single-entry arrays, lists, etc. + new_dict[field] = row[field][0] # will still work on single-char strings + else: + new_dict[field] = row[field] else: - new_dict[field] = row[field] + assert array.dtype[field].shape[0] == len(mapping[field]), "unable to unpack multidimensional array" + for i, name in enumerate(mapping[field]): + new_dict[name] = row[field][i] out.append(new_dict) for entry in out: diff --git a/libensemble/utils/pydantic_bindings.py b/libensemble/utils/pydantic_bindings.py index 7ceca9615..5c1f6e17d 100644 --- a/libensemble/utils/pydantic_bindings.py +++ b/libensemble/utils/pydantic_bindings.py @@ -5,7 +5,7 @@ from libensemble import specs from libensemble.resources import platforms from libensemble.utils.misc import pydanticV1 -from libensemble.utils.validators import ( +from libensemble.utils.validators import ( # check_output_fields, _UFUNC_INVALID_ERR, _UNRECOGNIZED_ERR, check_any_workers_and_disable_rm_if_tcp, @@ -16,8 +16,8 @@ check_inputs_exist, check_logical_cores, check_mpi_runner_type, - check_output_fields, check_provided_ufuncs, + check_set_gen_specs_from_variables, check_valid_comms_type, check_valid_in, check_valid_out, @@ -104,6 +104,7 @@ class Config: __validators__={ "check_valid_out": check_valid_out, "check_valid_in": check_valid_in, + "check_set_gen_specs_from_variables": check_set_gen_specs_from_variables, "genf_set_in_out_from_attrs": genf_set_in_out_from_attrs, }, ) @@ -129,7 +130,6 @@ class Config: __base__=specs._EnsembleSpecs, __validators__={ "check_exit_criteria": check_exit_criteria, - "check_output_fields": check_output_fields, "check_H0": check_H0, "check_provided_ufuncs": check_provided_ufuncs, }, diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index d74ea89d8..eea0cfcf7 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -108,7 +108,10 @@ def __init__(self, specs): def _get_points_updates(self, batch_size: int) -> (npt.NDArray, npt.NDArray): # no ask_updates on external gens - return (list_dicts_to_np(self.gen.ask(batch_size), dtype=self.specs.get("out")), None) + return ( + list_dicts_to_np(self.gen.ask(batch_size), dtype=self.specs.get("out"), mapping=self.gen.variables_mapping), + None, + ) def _convert_tell(self, x: npt.NDArray) -> list: self.gen.tell(np_to_list_dicts(x)) @@ -137,7 +140,9 @@ def _persistent_result(self, calc_in, persis_info, libE_info): """Setup comms with manager, setup gen, loop gen to completion, return gen's results""" self.ps = PersistentSupport(libE_info, EVAL_GEN_TAG) # libE gens will hit the following line, but list_dicts_to_np will passthrough if the output is a numpy array - H_out = list_dicts_to_np(self._get_initial_ask(libE_info), dtype=self.specs.get("out")) + H_out = list_dicts_to_np( + self._get_initial_ask(libE_info), dtype=self.specs.get("out"), mapping=self.gen.variables_mapping + ) tag, Work, H_in = self.ps.send_recv(H_out) # evaluate the initial sample final_H_in = self._start_generator_loop(tag, Work, H_in) return self.gen.final_tell(final_H_in), FINISHED_PERSISTENT_GEN_TAG diff --git a/libensemble/utils/specs_checkers.py b/libensemble/utils/specs_checkers.py index cf33d359f..b8e793fa5 100644 --- a/libensemble/utils/specs_checkers.py +++ b/libensemble/utils/specs_checkers.py @@ -25,28 +25,10 @@ def _check_exit_criteria(values): return values -def _check_output_fields(values): - out_names = [e[0] for e in libE_fields] - if scg(values, "H0") is not None and scg(values, "H0").dtype.names is not None: - out_names += list(scg(values, "H0").dtype.names) - out_names += [e[0] for e in scg(values, "sim_specs").outputs] - if scg(values, "gen_specs"): - out_names += [e[0] for e in scg(values, "gen_specs").outputs] - if scg(values, "alloc_specs"): - out_names += [e[0] for e in scg(values, "alloc_specs").outputs] - - for name in scg(values, "sim_specs").inputs: - assert name in out_names, ( - name + " in sim_specs['in'] is not in sim_specs['out'], " - "gen_specs['out'], alloc_specs['out'], H0, or libE_fields." - ) - - if scg(values, "gen_specs"): - for name in scg(values, "gen_specs").inputs: - assert name in out_names, ( - name + " in gen_specs['in'] is not in sim_specs['out'], " - "gen_specs['out'], alloc_specs['out'], H0, or libE_fields." - ) +def _check_set_gen_specs_from_variables(values): + if not len(scg(values, "outputs")): + if scg(values, "generator") and len(scg(values, "generator").gen_specs["out"]): + scs(values, "outputs", scg(values, "generator").gen_specs["out"]) return values diff --git a/libensemble/utils/validators.py b/libensemble/utils/validators.py index 80abfa9a3..6cd100f4d 100644 --- a/libensemble/utils/validators.py +++ b/libensemble/utils/validators.py @@ -6,13 +6,13 @@ from libensemble.resources.platforms import Platform from libensemble.utils.misc import pydanticV1 -from libensemble.utils.specs_checkers import ( +from libensemble.utils.specs_checkers import ( # _check_output_fields, _check_any_workers_and_disable_rm_if_tcp, _check_exit_criteria, _check_H0, _check_logical_cores, - _check_output_fields, _check_set_calc_dirs_on_input_dir, + _check_set_gen_specs_from_variables, _check_set_workflow_dir, ) @@ -148,8 +148,8 @@ def check_exit_criteria(cls, values): return _check_exit_criteria(values) @root_validator - def check_output_fields(cls, values): - return _check_output_fields(values) + def check_set_gen_specs_from_variables(cls, values): + return _check_set_gen_specs_from_variables(values) @root_validator def check_H0(cls, values): @@ -246,8 +246,8 @@ def check_exit_criteria(self): return _check_exit_criteria(self) @model_validator(mode="after") - def check_output_fields(self): - return _check_output_fields(self) + def check_set_gen_specs_from_variables(self): + return _check_set_gen_specs_from_variables(self) @model_validator(mode="after") def check_H0(self):