diff --git a/n3fit/src/n3fit/backends/keras_backend/MetaModel.py b/n3fit/src/n3fit/backends/keras_backend/MetaModel.py index f4c4cb0db6..ef8a792314 100644 --- a/n3fit/src/n3fit/backends/keras_backend/MetaModel.py +++ b/n3fit/src/n3fit/backends/keras_backend/MetaModel.py @@ -80,6 +80,38 @@ def _fill_placeholders(original_input, new_input=None): return x +def aggregate_replicas(dictionary, method="append", rep_token_index=1): + result = {} + if method == "append": + for key, value in dictionary.items(): + tokens = key.split('_') + if len(tokens) > rep_token_index and tokens[rep_token_index].startswith("rep"): + tokens.pop(rep_token_index) + newname = '_'.join(tokens) + if newname in result: + result[newname].append(value) + else: + result[newname] = [value] + else: + result[key] = value + for key, value in result.items(): + result[key] = np.array(value) + elif method == "sum": + for key, value in dictionary.items(): + tokens = key.split('_') + if len(tokens) > rep_token_index and tokens[rep_token_index].startswith("rep"): + tokens.pop(rep_token_index) + newname = '_'.join(tokens) + if newname in result: + result[newname] += value + else: + result[newname] = value + else: + result[key] = value + else: + raise ValueError(f"Invalid replica aggregation method: {method}") + return result + class MetaModel(Model): """ The `MetaModel` behaves as the tensorflow.keras.model.Model class, @@ -191,8 +223,7 @@ def perform_fit(self, x=None, y=None, epochs=1, **kwargs): if y is None: y = self.target_tensors history = super().fit(x=x, y=y, epochs=epochs, **kwargs) - loss_dict = history.history - return loss_dict + return aggregate_replicas(history.history, method="sum") def predict(self, x=None, **kwargs): """ Call super().predict with the right input arguments """ @@ -216,19 +247,44 @@ def compute_losses(self): """ if self.compute_losses_function is None: # If it is the first time we are passing through, compile the function and save it - out_names = [f"{i}_loss" for i in self.output_names] + names = {} + for output_name in self.output_names: + tokens = output_name.split('_') + if tokens[1].startswith("rep"): + tokens.pop(1) + data_name = '_'.join(tokens) + if data_name in names: + names[data_name] += 1 + else: + names[data_name] = 1 + n_replicas = next(iter(names.values())) + for name in names: + if names[name] != n_replicas: + raise ValueError(f"Dataset {name} has unexpected no replica output layers {names[name]}") + + out_names = [f"{i}_loss" for i in names] out_names.insert(0, "loss") - # Compile a evaluation function + # Compile evaluation function @tf.function def losses_fun(): predictions = self(self._parse_input(None)) # If we only have one dataset the output changes if len(out_names) == 2: predictions = [predictions] - total_loss = tf.reduce_sum(predictions, axis=0) - ret = [total_loss] + predictions - return dict(zip(out_names, ret)) +# total_loss = tf.reduce_sum(predictions, axis=0) + total_losses = [] + if n_replicas > 1: + stacked_predictions = [] + for i in range(len(out_names)-1): + reps = predictions[i * n_replicas: (i + 1) * n_replicas] + stacked_predictions.append(tf.concat(reps, axis=0)) + for i in range(n_replicas): + rep_losses = [predictions[i + d] for d in range(0, len(predictions), n_replicas)] + total_losses.append(tf.reduce_sum(rep_losses, axis=0)) + predictions = stacked_predictions + result = [tf.concat(total_losses, axis=0)] + predictions + return dict(zip(out_names, result)) self.compute_losses_function = losses_fun @@ -236,7 +292,9 @@ def losses_fun(): # The output of this function is to be used by python (and numpy) # so we need to convert the tensors - return _to_numpy_or_python_type(ret) + ret2 = _to_numpy_or_python_type(ret) +# import pdb; pdb.set_trace() + return ret2 def compile( self, diff --git a/n3fit/src/n3fit/backends/keras_backend/callbacks.py b/n3fit/src/n3fit/backends/keras_backend/callbacks.py index 7349d6be36..ada82f4bb6 100644 --- a/n3fit/src/n3fit/backends/keras_backend/callbacks.py +++ b/n3fit/src/n3fit/backends/keras_backend/callbacks.py @@ -13,6 +13,7 @@ import numpy as np import tensorflow as tf from tensorflow.keras.callbacks import TensorBoard, Callback +from n3fit.backends.keras_backend.MetaModel import aggregate_replicas log = logging.getLogger(__name__) @@ -80,7 +81,8 @@ def on_epoch_end(self, epoch, logs=None): """ Function to be called at the end of every epoch """ print_stats = ((epoch + 1) % self.log_freq) == 0 # Note that the input logs correspond to the fit before the weights are updated - self.stopping_object.monitor_chi2(logs, epoch, print_stats=print_stats) + training_info = aggregate_replicas(logs) + self.stopping_object.monitor_chi2(training_info, epoch, print_stats=print_stats) if self.stopping_object.stop_here(): self.model.stop_training = True diff --git a/n3fit/src/n3fit/backends/keras_backend/operations.py b/n3fit/src/n3fit/backends/keras_backend/operations.py index 8cf1065849..b8a50cf2d6 100644 --- a/n3fit/src/n3fit/backends/keras_backend/operations.py +++ b/n3fit/src/n3fit/backends/keras_backend/operations.py @@ -220,6 +220,11 @@ def flatten(x): return tf.reshape(x, (-1,)) +def expand(x, axis=-1): + """ Reshape tensor x """ + return tf.expand_dims(x, axis) + + def boolean_mask(*args, **kwargs): """ Applies a boolean mask to a tensor diff --git a/n3fit/src/n3fit/checks.py b/n3fit/src/n3fit/checks.py index e07d27d74c..10caa5183e 100644 --- a/n3fit/src/n3fit/checks.py +++ b/n3fit/src/n3fit/checks.py @@ -359,11 +359,11 @@ def check_consistent_parallel(parameters, parallel_models, same_trvl_per_replica """ if not parallel_models: return - if not same_trvl_per_replica: - raise CheckError( - "Replicas cannot be run in parallel with different training/validation " - " masks, please set `same_trvl_per_replica` to True in the runcard" - ) +# if not same_trvl_per_replica: +# raise CheckError( +# "Replicas cannot be run in parallel with different training/validation " +# " masks, please set `same_trvl_per_replica` to True in the runcard" +# ) if parameters.get("layer_type") != "dense": raise CheckError("Parallelization has only been tested with layer_type=='dense'") diff --git a/n3fit/src/n3fit/model_gen.py b/n3fit/src/n3fit/model_gen.py index 9ab6376996..5e34a4b85c 100644 --- a/n3fit/src/n3fit/model_gen.py +++ b/n3fit/src/n3fit/model_gen.py @@ -86,7 +86,7 @@ def __call__(self, pdf_layer, mask=None): def observable_generator( - spec_dict, positivity_initial=1.0, integrability=False + spec_dict, positivity_initial=1.0, integrability=False, name_suffix='' ): # pylint: disable=too-many-locals """ This function generates the observable model for each experiment. @@ -153,7 +153,7 @@ def observable_generator( # list of fktable_dictionaries # these will then be used to check how many different pdf inputs are needed # (and convolutions if given the case) - +# TODO: Make these more memory-efficient, i.e. factoring FKTables and separate Mask layer... if spec_dict["positivity"]: # Positivity (and integrability, which is a special kind of positivity...) # enters only at the "training" part of the models @@ -161,7 +161,7 @@ def observable_generator( dataset_dict["fktables"], dataset_dict["tr_fktables"], operation_name, - name=f"dat_{dataset_name}", + name=f"dat_{dataset_name}_{name_suffix}", ) obs_layer_ex = obs_layer_vl = None elif spec_dict.get("data_transformation_tr") is not None: @@ -170,7 +170,7 @@ def observable_generator( dataset_dict["fktables"], dataset_dict["ex_fktables"], operation_name, - name=f"exp_{dataset_name}", + name=f"exp_{dataset_name}_{name_suffix}", ) obs_layer_tr = obs_layer_vl = obs_layer_ex else: @@ -178,19 +178,19 @@ def observable_generator( dataset_dict["fktables"], dataset_dict["tr_fktables"], operation_name, - name=f"dat_{dataset_name}", + name=f"dat_{dataset_name}_{name_suffix}", ) obs_layer_ex = Obs_Layer( dataset_dict["fktables"], dataset_dict["ex_fktables"], operation_name, - name=f"exp_{dataset_name}", + name=f"exp_{dataset_name}_{name_suffix}", ) obs_layer_vl = Obs_Layer( dataset_dict["fktables"], dataset_dict["vl_fktables"], operation_name, - name=f"val_{dataset_name}", + name=f"val_{dataset_name}_{name_suffix}", ) # To know how many xpoints we compute we are duplicating functionality from obs_layer @@ -210,7 +210,7 @@ def observable_generator( full_nx = sum(dataset_xsizes) if spec_dict["positivity"]: out_positivity = ObservableWrapper( - spec_name, + f"{spec_name}_{name_suffix}", model_obs_tr, dataset_xsizes, multiplier=positivity_initial, @@ -235,7 +235,7 @@ def observable_generator( obsrot_vl = None out_tr = ObservableWrapper( - spec_name, + f"{spec_name}_{name_suffix}", model_obs_tr, dataset_xsizes, invcovmat=spec_dict["invcovmat"], @@ -243,7 +243,7 @@ def observable_generator( rotation=obsrot_tr, ) out_vl = ObservableWrapper( - f"{spec_name}_val", + f"{spec_name}_{name_suffix}_val", model_obs_vl, dataset_xsizes, invcovmat=spec_dict["invcovmat_vl"], @@ -251,7 +251,7 @@ def observable_generator( rotation=obsrot_vl, ) out_exp = ObservableWrapper( - f"{spec_name}_exp", + f"{spec_name}_{name_suffix}_exp", model_obs_ex, dataset_xsizes, invcovmat=spec_dict["invcovmat_true"], diff --git a/n3fit/src/n3fit/model_trainer.py b/n3fit/src/n3fit/model_trainer.py index 5ac392ac0a..bd7c2ad783 100644 --- a/n3fit/src/n3fit/model_trainer.py +++ b/n3fit/src/n3fit/model_trainer.py @@ -8,6 +8,7 @@ This allows to use hyperscanning libraries, that need to change the parameters of the network between iterations while at the same time keeping the amount of redundant calls to a minimum """ +import copy import logging from itertools import zip_longest import numpy as np @@ -36,7 +37,7 @@ def _pdf_injection(pdf_layers, observables, masks): """ Takes as input a list of PDF layers each corresponding to one observable (also given as a list) - And (where neded) a mask to select the output. + And (where needed) a mask to select the output. Returns a list of obs(pdf). Note that the list of masks don't need to be the same size as the list of layers/observables """ @@ -130,13 +131,13 @@ def __init__( number of models to fit in parallel """ # Save all input information - self.exp_info = exp_info + self.exp_info = list(exp_info) self.pos_info = pos_info self.integ_info = integ_info if self.integ_info is not None: - self.all_info = exp_info + pos_info + integ_info + self.all_info = self.exp_info + pos_info + integ_info else: - self.all_info = exp_info + pos_info + self.all_info = self.exp_info + pos_info self.flavinfo = flavinfo self.fitbasis = fitbasis self._nn_seeds = nnseeds @@ -182,9 +183,9 @@ def __init__( self.input_list = [] self.input_sizes = [] self.training = { - "output": [], - "expdata": [], - "ndata": 0, + "output": [[] for _ in range(parallel_models)], + "expdata": [[] for _ in range(parallel_models)], + "ndata": [0 for _ in range(parallel_models)], "model": None, "posdatasets": [], "posmultipliers": [], @@ -192,30 +193,31 @@ def __init__( "integdatasets": [], "integmultipliers": [], "integinitials": [], - "folds": [], + "folds": [[] for _ in range(parallel_models)], } self.validation = { - "output": [], - "expdata": [], - "ndata": 0, + "output": [[] for _ in range(parallel_models)], + "expdata": [[] for _ in range(parallel_models)], + "ndata": [0 for _ in range(parallel_models)], "model": None, - "folds": [], + "folds": [[] for _ in range(parallel_models)], "posdatasets": [], } self.experimental = { - "output": [], - "expdata": [], - "ndata": 0, + "output": [[] for _ in range(parallel_models)], + "expdata": [[] for _ in range(parallel_models)], + "ndata": [0 for _ in range(parallel_models)], "model": None, - "folds": [], + "folds": [[] for _ in range(parallel_models)], } self._fill_the_dictionaries() - if self.validation["ndata"] == 0: + if self.validation["ndata"][0] == 0: # If there is no validation, the validation chi2 = training chi2 self.no_validation = True - self.validation["expdata"] = self.training["expdata"] + for replica in range(parallel_models): + self.validation["expdata"][replica] = self.training["expdata"][replica] else: # Consider the validation only if there is validation (of course) self.no_validation = False @@ -258,34 +260,45 @@ def _fill_the_dictionaries(self): - ``name``: names of the experiment - ``ndata``: number of experimental points """ - for exp_dict in self.exp_info: - self.training["expdata"].append(exp_dict["expdata"]) - self.validation["expdata"].append(exp_dict["expdata_vl"]) - self.experimental["expdata"].append(exp_dict["expdata_true"]) + for replica in range(self._parallel_models): + replica_exp_info = self.exp_info[replica] + for exp_dict in replica_exp_info: + self.training["expdata"][replica].append(exp_dict["expdata"]) + self.validation["expdata"][replica].append(exp_dict["expdata_vl"]) + self.experimental["expdata"][replica].append(exp_dict["expdata_true"]) - self.training["folds"].append(exp_dict["folds"]["training"]) - self.validation["folds"].append(exp_dict["folds"]["validation"]) - self.experimental["folds"].append(exp_dict["folds"]["experimental"]) + nd_tr = exp_dict["ndata"] + nd_vl = exp_dict["ndata_vl"] - nd_tr = exp_dict["ndata"] - nd_vl = exp_dict["ndata_vl"] + self.training["ndata"][replica] += nd_tr + self.validation["ndata"][replica] += nd_vl + self.experimental["ndata"][replica] += nd_tr + nd_vl - self.training["ndata"] += nd_tr - self.validation["ndata"] += nd_vl - self.experimental["ndata"] += nd_tr + nd_vl + for dataset in exp_dict["datasets"]: + self.all_datasets.append(dataset["name"]) + +# TODO: Figure out how parallel replicas are supposed to work with k-folding + if replica == 0: + self.training["folds"].append(exp_dict["folds"]["training"]) + self.validation["folds"].append(exp_dict["folds"]["validation"]) + self.experimental["folds"].append(exp_dict["folds"]["experimental"]) + + for pos_dict in self.pos_info: + self.training["expdata"][replica].append(pos_dict["expdata"]) + self.validation["expdata"][replica].append(pos_dict["expdata"]) + + if self.integ_info is not None: + for integ_dict in self.integ_info: + self.training["expdata"][replica].append(integ_dict["expdata"]) - for dataset in exp_dict["datasets"]: - self.all_datasets.append(dataset["name"]) self.all_datasets = set(self.all_datasets) for pos_dict in self.pos_info: - self.training["expdata"].append(pos_dict["expdata"]) self.training["posdatasets"].append(pos_dict["name"]) - self.validation["expdata"].append(pos_dict["expdata"]) self.validation["posdatasets"].append(pos_dict["name"]) + if self.integ_info is not None: for integ_dict in self.integ_info: - self.training["expdata"].append(integ_dict["expdata"]) self.training["integdatasets"].append(integ_dict["name"]) def _model_generation(self, pdf_models, partition, partition_idx): @@ -344,17 +357,22 @@ def _model_generation(self, pdf_models, partition, partition_idx): # Note that all models share the same symbolic input so we take as input the last # full_model_input_dict in the loop - full_pdf_per_replica = op.stack(all_replicas_pdf, axis=-1) - # The input layer was a concatenation of all experiments # the output of the pdf on input_layer will be thus a concatenation # we need now to split the output on a different array per experiment + sp_ar = [self.input_sizes] sp_kw = {"axis": 1} - splitting_layer = op.as_layer(op.split, op_args=sp_ar, op_kwargs=sp_kw, name="pdf_split") - splitted_pdf = splitting_layer(full_pdf_per_replica) + splitting_op = op.as_layer(op.split, op_args=sp_ar, op_kwargs=sp_kw, name="pdf_split") +# import pdb; pdb.set_trace() + splitted_pdfs = [] + for layer in all_replicas_pdf: + split_layers = splitting_op(layer) + # TODO: This shouldn't be necessary, it introduces extra complexity to the NN, solve it another way + splitted_pdfs.append([op.expand(sl) for sl in split_layers]) # If we are in a kfolding partition, select which datasets are out + # TODO: Figure out how parallel replicas are supposed to work with k-folding training_mask = validation_mask = experimental_mask = [None] if partition and partition["datasets"]: # If we want to overfit the fold, leave the training and validation masks as [None] @@ -365,33 +383,45 @@ def _model_generation(self, pdf_models, partition, partition_idx): validation_mask = [i[partition_idx] for i in self.validation["folds"]] experimental_mask = [i[partition_idx] for i in self.experimental["folds"]] - # Training and validation leave out the kofld dataset + # Training and validation leave out the k-fold dataset # experiment leaves out the negation - output_tr = _pdf_injection(splitted_pdf, self.training["output"], training_mask) - training = MetaModel(full_model_input_dict, output_tr) + nrep = len(all_replicas_pdf) + + output_tr = [_pdf_injection(splitted_pdfs[r], self.training["output"][r], training_mask) for r in range(nrep)] + output_tr_t = sum(map(list, zip(*output_tr)), []) + training = MetaModel(full_model_input_dict, output_tr_t) # Validation skips integrability and the "true" chi2 skips also positivity, # so we must only use the corresponding subset of PDF functions val_pdfs = [] exp_pdfs = [] - for partial_pdf, obs in zip(splitted_pdf, self.training["output"]): - if not obs.positivity and not obs.integrability: - val_pdfs.append(partial_pdf) - exp_pdfs.append(partial_pdf) - elif not obs.integrability and obs.positivity: - val_pdfs.append(partial_pdf) - - # We don't want to included the integrablity in the validation - output_vl = _pdf_injection(val_pdfs, self.validation["output"], validation_mask) - validation = MetaModel(full_model_input_dict, output_vl) + for replica in range(nrep): + val_pdfs.append([]) + exp_pdfs.append([]) + for partial_pdf, obs in zip(splitted_pdfs[replica], self.training["output"][replica]): + if not obs.positivity and not obs.integrability: + val_pdfs[replica].append(partial_pdf) + exp_pdfs[replica].append(partial_pdf) + elif not obs.integrability and obs.positivity: + val_pdfs[replica].append(partial_pdf) + + # We don't want to include the integrablity in the validation + output_vl = [_pdf_injection(val_pdfs[r], self.validation["output"][r], validation_mask) for r in range(nrep)] + output_vl_t = sum(map(list, zip(*output_vl)), []) + + validation = MetaModel(full_model_input_dict, output_vl_t) # Or the positivity in the total chi2 - output_ex = _pdf_injection(exp_pdfs, self.experimental["output"], experimental_mask) - experimental = MetaModel(full_model_input_dict, output_ex) + output_ex = [_pdf_injection(exp_pdfs[r], self.experimental["output"][r], + experimental_mask) for r in range(nrep)] + output_ex_t = sum(map(list, zip(*output_ex)), []) + experimental = MetaModel(full_model_input_dict, output_ex_t) if self.print_summary: training.summary() + print("The validation model is:") + validation.summary() models = { "training": training, @@ -411,13 +441,16 @@ def _reset_observables(self): """ self.input_list = [] self.input_sizes = [] - for key in ["output", "posmultipliers", "integmultipliers"]: + self.training["output"] = [[] for _ in range(self._parallel_models)] + self.validation["output"] = [[] for _ in range(self._parallel_models)] + self.experimental["output"] = [[] for _ in range(self._parallel_models)] + for key in ["posmultipliers", "integmultipliers"]: self.training[key] = [] self.validation[key] = [] self.experimental[key] = [] ############################################################################ - # # Parametizable functions # + # # Parameterizable functions # # # # The functions defined in this block accept a 'params' dictionary which # # defines the fit and the behaviours of the Neural Networks # @@ -458,20 +491,20 @@ def _generate_observables( log.info("Generating layers") # Now we need to loop over all dictionaries (First exp_info, then pos_info and integ_info) - for exp_dict in self.exp_info: - if not self.mode_hyperopt: - log.info("Generating layers for experiment %s", exp_dict["name"]) - - exp_layer = model_gen.observable_generator(exp_dict) - - # Save the input(s) corresponding to this experiment - self.input_list += exp_layer["inputs"] - self.input_sizes.append(exp_layer["experiment_xsize"]) - - # Now save the observable layer, the losses and the experimental data - self.training["output"].append(exp_layer["output_tr"]) - self.validation["output"].append(exp_layer["output_vl"]) - self.experimental["output"].append(exp_layer["output"]) + for replica in range(self._parallel_models): + for exp_dict in self.exp_info[replica]: + if not self.mode_hyperopt: + log.info("Generating layers for experiment %s", exp_dict["name"]) + exp_layer = model_gen.observable_generator(exp_dict, name_suffix="rep"+str(replica)) + # Now save the observable layer, the losses and the experimental data + self.training["output"][replica].append(exp_layer["output_tr"]) + self.validation["output"][replica].append(exp_layer["output_vl"]) + self.experimental["output"][replica].append(exp_layer["output"]) + + if replica == 0: + # Save the input(s) corresponding to this experiment + self.input_list += exp_layer["inputs"] + self.input_sizes.append(exp_layer["experiment_xsize"]) # Generate the positivity penalty for pos_dict in self.pos_info: @@ -485,14 +518,16 @@ def _generate_observables( all_pos_initial, all_pos_multiplier, max_lambda, positivity_steps ) - pos_layer = model_gen.observable_generator(pos_dict, positivity_initial=pos_initial) - # The input list is still common - self.input_list += pos_layer["inputs"] - self.input_sizes.append(pos_layer["experiment_xsize"]) - # The positivity should be on both training and validation models - self.training["output"].append(pos_layer["output_tr"]) - self.validation["output"].append(pos_layer["output_tr"]) + for replica in range(self._parallel_models): + pos_layer = model_gen.observable_generator(pos_dict, positivity_initial=pos_initial, + name_suffix="rep"+str(replica)) + self.training["output"][replica].append(pos_layer["output_tr"]) + self.validation["output"][replica].append(pos_layer["output_tr"]) + # The input list is still common + if replica == 0: + self.input_list += pos_layer["inputs"] + self.input_sizes.append(pos_layer["experiment_xsize"]) self.training["posmultipliers"].append(pos_multiplier) self.training["posinitials"].append(pos_initial) @@ -510,15 +545,18 @@ def _generate_observables( all_integ_initial, all_integ_multiplier, max_lambda, integrability_steps ) - integ_layer = model_gen.observable_generator( - integ_dict, positivity_initial=integ_initial, integrability=True - ) - # The input list is still common - self.input_list += integ_layer["inputs"] - self.input_sizes.append(integ_layer["experiment_xsize"]) - # The integrability all falls to the training - self.training["output"].append(integ_layer["output_tr"]) + for replica in range(self._parallel_models): + integ_layer = model_gen.observable_generator( + integ_dict, positivity_initial=integ_initial, integrability=True, + name_suffix="rep" + str(replica)) + self.training["output"][replica].append(integ_layer["output_tr"]) + + if replica == 0: + # The input list is still common + self.input_list += integ_layer["inputs"] + self.input_sizes.append(integ_layer["experiment_xsize"]) + self.training["integmultipliers"].append(integ_multiplier) self.training["integinitials"].append(integ_initial) @@ -644,12 +682,13 @@ def _prepare_reporting(self, partition): """ reported_keys = ["name", "count_chi2", "positivity", "integrability", "ndata", "ndata_vl"] reporting_list = [] - for exp_dict in self.all_info: + + for exp_dict in self.all_info[0] + self.all_info[self._parallel_models:]: reporting_dict = {k: exp_dict.get(k) for k in reported_keys} if partition: # If we are in a partition we need to remove the number of datapoints # in order to avoid calculating the chi2 wrong - for dataset in exp_dict["datasets"]: + for dataset in exp_dict[0]["datasets"]: if dataset in partition["datasets"]: ndata = dataset["ndata"] frac = dataset["frac"] @@ -669,21 +708,26 @@ def _train_and_fit(self, training_model, stopping_object, epochs=100): will be multiplied by their respective integrability multipliers """ callback_st = callbacks.StoppingCallback(stopping_object) - callback_pos = callbacks.LagrangeCallback( - self.training["posdatasets"], - self.training["posmultipliers"], - update_freq=PUSH_POSITIVITY_EACH, - ) - callback_integ = callbacks.LagrangeCallback( - self.training["integdatasets"], - self.training["integmultipliers"], - update_freq=PUSH_INTEGRABILITY_EACH, - ) + + callbacks_pos, callbacks_integ = [], [] + for replica in range(self._parallel_models): + callback_pos = callbacks.LagrangeCallback( + [s + "_rep" + str(replica) for s in self.training["posdatasets"]], + self.training["posmultipliers"], + update_freq=PUSH_POSITIVITY_EACH, + ) + callbacks_pos.append(callback_pos) + callback_integ = callbacks.LagrangeCallback( + [s + "_rep" + str(replica) for s in self.training["integdatasets"]], + self.training["integmultipliers"], + update_freq=PUSH_INTEGRABILITY_EACH, + ) + callbacks_integ.append(callback_integ) training_model.perform_fit( epochs=epochs, - verbose=False, - callbacks=self.callbacks + [callback_st, callback_pos, callback_integ], + verbose=True, + callbacks=self.callbacks + [callback_st] + callbacks_pos + callbacks_integ, ) # TODO: in order to use multireplica in hyperopt is is necessary to define what "passing" means diff --git a/n3fit/src/n3fit/performfit.py b/n3fit/src/n3fit/performfit.py index ff3a846196..d98f5bcc78 100644 --- a/n3fit/src/n3fit/performfit.py +++ b/n3fit/src/n3fit/performfit.py @@ -155,6 +155,7 @@ def performfit( # Parse the experiments so that the output data contain information for all replicas # as the only different from replica to replica is the experimental training/validation data all_experiments = copy.deepcopy(replica_experiments[0]) + # n_experiments dicts for i_exp in range(len(all_experiments)): training_data = [] validation_data = [] @@ -168,17 +169,18 @@ def performfit( replicas[0], replicas[0] + n_models - 1, ) - replicas_info = [(replicas, all_experiments, nnseeds)] + replicas_info = [(replicas, replica_experiments, nnseeds)] else: + # Cases 1 and 2 above are a special case of 3 where the replica idx and the seed should + # be a list of just one element replicas_info = replicas_nnseed_fitting_data_dict + for i, info_tuple in enumerate(replicas_info): + replica_idxs = info_tuple[0] + nnseeds = info_tuple[2] + replicas_info[i] = (tuple([replica_idxs]), info_tuple[1], tuple([nnseeds])) for replica_idxs, exp_info, nnseeds in replicas_info: - if not parallel_models or n_models == 1: - # Cases 1 and 2 above are a special case of 3 where the replica idx and the seed should - # be a list of just one element - replica_idxs = [replica_idxs] - nnseeds = [nnseeds] - log.info("Starting replica fit %d", replica_idxs[0]) + log.info("Starting replica fit " + str(replica_idxs)) # Generate a ModelTrainer object # this object holds all necessary information to train a PDF (up to the NN definition) diff --git a/n3fit/src/n3fit/stopping.py b/n3fit/src/n3fit/stopping.py index 7b805beccd..b34e6423eb 100644 --- a/n3fit/src/n3fit/stopping.py +++ b/n3fit/src/n3fit/stopping.py @@ -123,7 +123,7 @@ def parse_losses(history_object, data, suffix="loss"): total_loss += loss # By taking the loss from the history object we would be saving the total loss - # including positivity sets and (if added/enabled) regularizsers + # including positivity sets and (if added/enabled) regularizers # instead we want to restrict ourselves to the loss coming from experiments # total_loss = np.mean(hobj["loss"]) / total_points total_loss /= total_points @@ -354,9 +354,11 @@ def save_best_replica(self, i, epoch=None): If an epoch is given, save the best as the given epoch, otherwise use the last one """ + if epoch is None: epoch = self.final_epoch - loss = self.get_state(epoch).vl_loss[i] + state = self.get_state(epoch) + loss = state.vl_loss[i] self._replicas[i].register_best(loss, epoch) def all_positivity_status(self): @@ -532,6 +534,7 @@ def monitor_chi2(self, training_info, epoch, print_stats=False): return False # Step 2. Compute the validation metrics +# import pdb; pdb.set_trace() validation_info = self._validation.compute_losses() # Step 3. Register the current point in (the) history @@ -668,7 +671,7 @@ def check_positivity(self, history_object): If the positivity loss is above the threshold, the positivity fails otherwise, it passes. It returns an array booleans which are True if positivity passed - story_object[key_loss] < self.threshold + history_object[key_loss] < self.threshold Parameters ---------- history_object: dict