Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 66 additions & 8 deletions n3fit/src/n3fit/backends/keras_backend/MetaModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,38 @@ def _fill_placeholders(original_input, new_input=None):
return x


def aggregate_replicas(dictionary, method="append", rep_token_index=1):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some docstrings to this function? What is it used for, what is the expected input / output etc?
And what is the difference between append and sum.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I found myself hard-coding loops to either make lists of partial losses or sum them in other cases, which is why I created this method. I think this entire loss aggregation business should have a round of code improvement because as it is it makes very specific assumptions about the structure of the output layer names. I would like to adopt a more robust design.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But note that you don't need partial losses and that, moving forward, there will be no partial losses (as all experiments will be grouped together).

About the names, for better or for worse there are several layers of nnpdf code that rely on the names for different things (positivity, integrability, etc) so I guess you can rely on that as well here.

result = {}
if method == "append":
for key, value in dictionary.items():
tokens = key.split('_')
if len(tokens) > rep_token_index and tokens[rep_token_index].startswith("rep"):
tokens.pop(rep_token_index)
newname = '_'.join(tokens)
if newname in result:
result[newname].append(value)
else:
result[newname] = [value]
else:
result[key] = value
for key, value in result.items():
result[key] = np.array(value)
elif method == "sum":
for key, value in dictionary.items():
tokens = key.split('_')
if len(tokens) > rep_token_index and tokens[rep_token_index].startswith("rep"):
tokens.pop(rep_token_index)
newname = '_'.join(tokens)
if newname in result:
result[newname] += value
else:
result[newname] = value
else:
result[key] = value
else:
raise ValueError(f"Invalid replica aggregation method: {method}")
return result

class MetaModel(Model):
"""
The `MetaModel` behaves as the tensorflow.keras.model.Model class,
Expand Down Expand Up @@ -191,8 +223,7 @@ def perform_fit(self, x=None, y=None, epochs=1, **kwargs):
if y is None:
y = self.target_tensors
history = super().fit(x=x, y=y, epochs=epochs, **kwargs)
loss_dict = history.history
return loss_dict
return aggregate_replicas(history.history, method="sum")

def predict(self, x=None, **kwargs):
""" Call super().predict with the right input arguments """
Expand All @@ -216,27 +247,54 @@ def compute_losses(self):
"""
if self.compute_losses_function is None:
# If it is the first time we are passing through, compile the function and save it
out_names = [f"{i}_loss" for i in self.output_names]
names = {}
for output_name in self.output_names:
tokens = output_name.split('_')
if tokens[1].startswith("rep"):
tokens.pop(1)
data_name = '_'.join(tokens)
if data_name in names:
names[data_name] += 1
else:
names[data_name] = 1
n_replicas = next(iter(names.values()))
for name in names:
if names[name] != n_replicas:
raise ValueError(f"Dataset {name} has unexpected no replica output layers {names[name]}")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this error ever expected? If so, and depending on the situation, I would prefer to make sure that it is caught before the fit starts loading the data, otherwise is a very expensive crash.

Also, I'm not entirely sure I understand the logic of this loop. What is this used for? If it is just for the check I would rather remove it. When the fit goes to the point of computing the loss there should be no doubt whatsoever that all replicas are working, if there is a problem it should've been caught and fixed earlier.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop is simply there to compose the out_names and this entire block including the losses_fun() can be simplified by saving some information at initialization. Actually this thing should be handled by a 'aggregate_losses' type of utility method, but it works at the level of tensors, which is why it's still there...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why have the out_names changed with respect to the previous version?

Also, is the error ever expected? As I said, if it is, then it should be handled elsewhere.


out_names = [f"{i}_loss" for i in names]
out_names.insert(0, "loss")

# Compile a evaluation function
# Compile evaluation function
@tf.function
def losses_fun():
predictions = self(self._parse_input(None))
# If we only have one dataset the output changes
if len(out_names) == 2:
predictions = [predictions]
total_loss = tf.reduce_sum(predictions, axis=0)
ret = [total_loss] + predictions
return dict(zip(out_names, ret))
# total_loss = tf.reduce_sum(predictions, axis=0)
total_losses = []
if n_replicas > 1:
stacked_predictions = []
for i in range(len(out_names)-1):
reps = predictions[i * n_replicas: (i + 1) * n_replicas]
stacked_predictions.append(tf.concat(reps, axis=0))
for i in range(n_replicas):
rep_losses = [predictions[i + d] for d in range(0, len(predictions), n_replicas)]
total_losses.append(tf.reduce_sum(rep_losses, axis=0))
predictions = stacked_predictions
result = [tf.concat(total_losses, axis=0)] + predictions
return dict(zip(out_names, result))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather lose some granularity on the loss that complicate (with quite a few loops) something that is going to be computed once per epoch. Take into account that we don't really care about the individual loss of the datasets (actually, when we will be doing th uncertainties, there will be no individual groups).
The loss per replica might be interesting, but again, it is something that can be recovered at the end.


self.compute_losses_function = losses_fun

ret = self.compute_losses_function()

# The output of this function is to be used by python (and numpy)
# so we need to convert the tensors
return _to_numpy_or_python_type(ret)
ret2 = _to_numpy_or_python_type(ret)
# import pdb; pdb.set_trace()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# import pdb; pdb.set_trace()

return ret2

def compile(
self,
Expand Down
4 changes: 3 additions & 1 deletion n3fit/src/n3fit/backends/keras_backend/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import TensorBoard, Callback
from n3fit.backends.keras_backend.MetaModel import aggregate_replicas

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -80,7 +81,8 @@ def on_epoch_end(self, epoch, logs=None):
""" Function to be called at the end of every epoch """
print_stats = ((epoch + 1) % self.log_freq) == 0
# Note that the input logs correspond to the fit before the weights are updated
self.stopping_object.monitor_chi2(logs, epoch, print_stats=print_stats)
training_info = aggregate_replicas(logs)
self.stopping_object.monitor_chi2(training_info, epoch, print_stats=print_stats)
if self.stopping_object.stop_here():
self.model.stop_training = True

Expand Down
5 changes: 5 additions & 0 deletions n3fit/src/n3fit/backends/keras_backend/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ def flatten(x):
return tf.reshape(x, (-1,))


def expand(x, axis=-1):
""" Reshape tensor x """
return tf.expand_dims(x, axis)


def boolean_mask(*args, **kwargs):
"""
Applies a boolean mask to a tensor
Expand Down
10 changes: 5 additions & 5 deletions n3fit/src/n3fit/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ def check_consistent_parallel(parameters, parallel_models, same_trvl_per_replica
"""
if not parallel_models:
return
if not same_trvl_per_replica:
raise CheckError(
"Replicas cannot be run in parallel with different training/validation "
" masks, please set `same_trvl_per_replica` to True in the runcard"
)
# if not same_trvl_per_replica:
# raise CheckError(
# "Replicas cannot be run in parallel with different training/validation "
# " masks, please set `same_trvl_per_replica` to True in the runcard"
# )
Comment on lines +362 to +366
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# if not same_trvl_per_replica:
# raise CheckError(
# "Replicas cannot be run in parallel with different training/validation "
# " masks, please set `same_trvl_per_replica` to True in the runcard"
# )

if parameters.get("layer_type") != "dense":
raise CheckError("Parallelization has only been tested with layer_type=='dense'")

Expand Down
22 changes: 11 additions & 11 deletions n3fit/src/n3fit/model_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __call__(self, pdf_layer, mask=None):


def observable_generator(
spec_dict, positivity_initial=1.0, integrability=False
spec_dict, positivity_initial=1.0, integrability=False, name_suffix=''
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing the name_suffix is so that different observable are treated differently so one can have different trvl masks. Please add to the docstr.

): # pylint: disable=too-many-locals
"""
This function generates the observable model for each experiment.
Expand Down Expand Up @@ -153,15 +153,15 @@ def observable_generator(
# list of fktable_dictionaries
# these will then be used to check how many different pdf inputs are needed
# (and convolutions if given the case)

# TODO: Make these more memory-efficient, i.e. factoring FKTables and separate Mask layer...
if spec_dict["positivity"]:
# Positivity (and integrability, which is a special kind of positivity...)
# enters only at the "training" part of the models
obs_layer_tr = Obs_Layer(
dataset_dict["fktables"],
dataset_dict["tr_fktables"],
operation_name,
name=f"dat_{dataset_name}",
name=f"dat_{dataset_name}_{name_suffix}",
)
obs_layer_ex = obs_layer_vl = None
elif spec_dict.get("data_transformation_tr") is not None:
Expand All @@ -170,27 +170,27 @@ def observable_generator(
dataset_dict["fktables"],
dataset_dict["ex_fktables"],
operation_name,
name=f"exp_{dataset_name}",
name=f"exp_{dataset_name}_{name_suffix}",
)
obs_layer_tr = obs_layer_vl = obs_layer_ex
else:
obs_layer_tr = Obs_Layer(
dataset_dict["fktables"],
dataset_dict["tr_fktables"],
operation_name,
name=f"dat_{dataset_name}",
name=f"dat_{dataset_name}_{name_suffix}",
)
obs_layer_ex = Obs_Layer(
dataset_dict["fktables"],
dataset_dict["ex_fktables"],
operation_name,
name=f"exp_{dataset_name}",
name=f"exp_{dataset_name}_{name_suffix}",
)
obs_layer_vl = Obs_Layer(
dataset_dict["fktables"],
dataset_dict["vl_fktables"],
operation_name,
name=f"val_{dataset_name}",
name=f"val_{dataset_name}_{name_suffix}",
)

# To know how many xpoints we compute we are duplicating functionality from obs_layer
Expand All @@ -210,7 +210,7 @@ def observable_generator(
full_nx = sum(dataset_xsizes)
if spec_dict["positivity"]:
out_positivity = ObservableWrapper(
spec_name,
f"{spec_name}_{name_suffix}",
model_obs_tr,
dataset_xsizes,
multiplier=positivity_initial,
Expand All @@ -235,23 +235,23 @@ def observable_generator(
obsrot_vl = None

out_tr = ObservableWrapper(
spec_name,
f"{spec_name}_{name_suffix}",
model_obs_tr,
dataset_xsizes,
invcovmat=spec_dict["invcovmat"],
data=spec_dict["expdata"],
rotation=obsrot_tr,
)
out_vl = ObservableWrapper(
f"{spec_name}_val",
f"{spec_name}_{name_suffix}_val",
model_obs_vl,
dataset_xsizes,
invcovmat=spec_dict["invcovmat_vl"],
data=spec_dict["expdata_vl"],
rotation=obsrot_vl,
)
out_exp = ObservableWrapper(
f"{spec_name}_exp",
f"{spec_name}_{name_suffix}_exp",
model_obs_ex,
dataset_xsizes,
invcovmat=spec_dict["invcovmat_true"],
Expand Down
Loading